You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/15 15:01:37 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

mumrah commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r670506521



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1
+
+  class MessageFormatVersion(messageFormatVersionString: String, interBrokerProtocolVersionString: String) {
+    val messageFormatVersion = ApiVersion(messageFormatVersionString)
+    private val interBrokerProtocolVersion = ApiVersion(interBrokerProtocolVersionString)
+
+    def shouldIgnore: Boolean = shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)
+
+    def shouldWarn: Boolean =
+      interBrokerProtocolVersion >= KAFKA_3_0_IV1 && messageFormatVersion.recordVersion.precedes(RecordVersion.V2)
+
+    @nowarn("cat=deprecation")
+    def topicWarningMessage(topicName: String): String = {
+      s"Topic configuration ${LogConfig.MessageFormatVersionProp} with value `$messageFormatVersionString` is ignored " +
+        s"for `$topicName` because the inter-broker protocol version `$interBrokerProtocolVersionString` is " +
+        "greater or equal than 3.0"
+    }
+
+    @nowarn("cat=deprecation")
+    def brokerWarningMessage: String = {
+      s"Broker configuration ${KafkaConfig.LogMessageFormatVersionProp} with value $messageFormatVersionString is ignored " +
+        s"because the inter-broker protocol version `$interBrokerProtocolVersionString` is greater or equal than 3.0"

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1
+
+  class MessageFormatVersion(messageFormatVersionString: String, interBrokerProtocolVersionString: String) {
+    val messageFormatVersion = ApiVersion(messageFormatVersionString)
+    private val interBrokerProtocolVersion = ApiVersion(interBrokerProtocolVersionString)
+
+    def shouldIgnore: Boolean = shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)
+
+    def shouldWarn: Boolean =
+      interBrokerProtocolVersion >= KAFKA_3_0_IV1 && messageFormatVersion.recordVersion.precedes(RecordVersion.V2)
+
+    @nowarn("cat=deprecation")
+    def topicWarningMessage(topicName: String): String = {
+      s"Topic configuration ${LogConfig.MessageFormatVersionProp} with value `$messageFormatVersionString` is ignored " +
+        s"for `$topicName` because the inter-broker protocol version `$interBrokerProtocolVersionString` is " +
+        "greater or equal than 3.0"

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -253,9 +259,10 @@ class LogManager(logDirs: Seq[File],
                            hadCleanShutdown: Boolean,
                            recoveryPoints: Map[TopicPartition, Long],
                            logStartOffsets: Map[TopicPartition, Long],
+                           defaultConfig: LogConfig,
                            topicConfigOverrides: Map[String, LogConfig]): Log = {
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigOverrides.getOrElse(topicPartition.topic, currentDefaultConfig)
+    val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)

Review comment:
       The addition of this argument here (and elsewhere in LogManager) is to let us go through the validation/warning logic during startup. Is that right?
   
   Are the other usages of `currentDefaultConfig` safe as-is? What happens if someone reconfigures the log config? Is that handled via `LogManager#fetchLogConfig`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org