You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/13 14:52:36 UTC

[GitHub] [kafka] ijuma opened a new pull request #11036: KAFKA-12944: Always write record batches with v2 (KIP-724)

ijuma opened a new pull request #11036:
URL: https://github.com/apache/kafka/pull/11036


   TBD
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882511896


   I'll go ahead and merge. If there are further comments from @mumrah or @hachikuji, I'll address in a follow up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671565901



##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -270,7 +271,8 @@ object ConfigCommand extends Config {
     }
     if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
       println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
-        s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
+        "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +

Review comment:
       Sure, sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882589793


   Thanks @ijuma, I don't have any more questions or follow-ups 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882589793


   Thanks @ijuma, I don't have any more questions or follow-ups 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671388968



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       You mean if someone is using trunk with IBP 3.0-IV1, is using the old message format and doesn't want the new behavior? Seems very unlikely, but it's a fair point. I can introduce `IV2` to be avoid it altogether.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672245627



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       Yeah, I do agree with you. I don't think that anyone already issues `3.0-IV1` anyway.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       For my own education, could `Properties` by unmodifiable somehow? 
   
   I pointed that out because the scaladoc of the `ConfigRepository` trait states the following `@return a copy of the configuration for the given resource`. It seems that the two main implementations respect this however `MockConfigRepository` does not. Therefore, I tend to agree with you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #11036:
URL: https://github.com/apache/kafka/pull/11036


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671370260



##########
File path: core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
##########
@@ -103,4 +109,38 @@ class AbstractPartitionTest {
       TestUtils.clearYammerMetrics()
     }
   }
+
+  protected def setupPartitionWithMocks(leaderEpoch: Int,
+                                      isLeader: Boolean): Partition = {

Review comment:
       nit: Alignement.

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -152,7 +149,7 @@ class LocalLogTest {
     val oldConfig = log.config
     assertEquals(oldConfig, log.config)
 
-    val newConfig = createLogConfig(segmentBytes=oldConfig.segmentSize + 1)
+    val newConfig = LogTestUtils.createLogConfig(segmentBytes=oldConfig.segmentSize + 1)

Review comment:
       nit: While here, could we add a space before and after `=`?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       It seems that we get a copy of the config here so I suppose that we could directly mutate it below instead of making a copy again. What do you think?

##########
File path: core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
##########
@@ -163,6 +163,25 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
     records
   }
 
+
+  /**
+   * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
+   * records to each partition
+   */
+  protected def createTopicAndSendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                topicName: String,

Review comment:
       nit: Could we align the arguments?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +

Review comment:
       It seems that either the sentence is not finished or `if inter-broker` is not necessary.

##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       I wonder if we should introduce `KAFKA_3_0_IV2` to be safe here. I don't think anyone already uses `KAFKA_3_0_IV1` but what would happen if one does? 

##########
File path: core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+
+import java.util
+import java.util.{Optional, Properties}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class BaseFetchRequestTest extends BaseRequestTest {
+
+  protected var producer: KafkaProducer[String, String] = null
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.FetchMaxBytes, Int.MaxValue.toString)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  protected def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long],
+                                 version: Short): FetchRequest = {
+    val topicIds = getTopicIds().asJava
+    FetchRequest.Builder.forConsumer(version, Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap), topicIds)
+      .setMaxBytes(maxResponseBytes).build()
+  }
+
+  protected def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {

Review comment:
       nit: Alignment.

##########
File path: core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+
+import java.util
+import java.util.{Optional, Properties}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class BaseFetchRequestTest extends BaseRequestTest {
+
+  protected var producer: KafkaProducer[String, String] = null
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.FetchMaxBytes, Int.MaxValue.toString)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  protected def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long],
+                                 version: Short): FetchRequest = {

Review comment:
       nit: Alignment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882589793


   Thanks @ijuma, I don't have any more questions or follow-ups 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671389668



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Yes, I thought of that, but we don't specify that the result of `topicConfig` is mutable and we often return unmodifiable collections. Even as it is, we don't do the copy in the common case, so that's why I went with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882511896






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671608959



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       On second thought, this is going to be very confusing since we already have `3.1-IV0`. I think we're better off just going with `3.0-IV1`. The change here is benign in general and the IBP check is us being super careful. The number of people using IBP `3.0-IV1` is probably 0, but if there are some, they are not likely to be using the ancient record formats 0 and 1 in such clusters for new records.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882554954


   It occurred to me that some system tests may require changes. I'll check the nightly results and submit a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #11036:
URL: https://github.com/apache/kafka/pull/11036


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #11036:
URL: https://github.com/apache/kafka/pull/11036


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r670555452



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -253,9 +259,10 @@ class LogManager(logDirs: Seq[File],
                            hadCleanShutdown: Boolean,
                            recoveryPoints: Map[TopicPartition, Long],
                            logStartOffsets: Map[TopicPartition, Long],
+                           defaultConfig: LogConfig,
                            topicConfigOverrides: Map[String, LogConfig]): Log = {
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigOverrides.getOrElse(topicPartition.topic, currentDefaultConfig)
+    val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)

Review comment:
       It's to handle topic configs read during startup. The ones updated dynamically go via TopicConfigHandler. The dynamic broker config code never allowed log.message.format.version to be set. There are tests for all the cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-881058902


   JDK 16 build had a bit of flakiness, but the rest looked good. I kicked off another build just in case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r670506521



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1
+
+  class MessageFormatVersion(messageFormatVersionString: String, interBrokerProtocolVersionString: String) {
+    val messageFormatVersion = ApiVersion(messageFormatVersionString)
+    private val interBrokerProtocolVersion = ApiVersion(interBrokerProtocolVersionString)
+
+    def shouldIgnore: Boolean = shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)
+
+    def shouldWarn: Boolean =
+      interBrokerProtocolVersion >= KAFKA_3_0_IV1 && messageFormatVersion.recordVersion.precedes(RecordVersion.V2)
+
+    @nowarn("cat=deprecation")
+    def topicWarningMessage(topicName: String): String = {
+      s"Topic configuration ${LogConfig.MessageFormatVersionProp} with value `$messageFormatVersionString` is ignored " +
+        s"for `$topicName` because the inter-broker protocol version `$interBrokerProtocolVersionString` is " +
+        "greater or equal than 3.0"
+    }
+
+    @nowarn("cat=deprecation")
+    def brokerWarningMessage: String = {
+      s"Broker configuration ${KafkaConfig.LogMessageFormatVersionProp} with value $messageFormatVersionString is ignored " +
+        s"because the inter-broker protocol version `$interBrokerProtocolVersionString` is greater or equal than 3.0"

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1
+
+  class MessageFormatVersion(messageFormatVersionString: String, interBrokerProtocolVersionString: String) {
+    val messageFormatVersion = ApiVersion(messageFormatVersionString)
+    private val interBrokerProtocolVersion = ApiVersion(interBrokerProtocolVersionString)
+
+    def shouldIgnore: Boolean = shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)
+
+    def shouldWarn: Boolean =
+      interBrokerProtocolVersion >= KAFKA_3_0_IV1 && messageFormatVersion.recordVersion.precedes(RecordVersion.V2)
+
+    @nowarn("cat=deprecation")
+    def topicWarningMessage(topicName: String): String = {
+      s"Topic configuration ${LogConfig.MessageFormatVersionProp} with value `$messageFormatVersionString` is ignored " +
+        s"for `$topicName` because the inter-broker protocol version `$interBrokerProtocolVersionString` is " +
+        "greater or equal than 3.0"

Review comment:
       nit: alignment

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -253,9 +259,10 @@ class LogManager(logDirs: Seq[File],
                            hadCleanShutdown: Boolean,
                            recoveryPoints: Map[TopicPartition, Long],
                            logStartOffsets: Map[TopicPartition, Long],
+                           defaultConfig: LogConfig,
                            topicConfigOverrides: Map[String, LogConfig]): Log = {
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigOverrides.getOrElse(topicPartition.topic, currentDefaultConfig)
+    val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)

Review comment:
       The addition of this argument here (and elsewhere in LogManager) is to let us go through the validation/warning logic during startup. Is that right?
   
   Are the other usages of `currentDefaultConfig` safe as-is? What happens if someone reconfigures the log config? Is that handled via `LogManager#fetchLogConfig`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671566299



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         // which were written in the new format prior to the version downgrade.
         val unconvertedRecords = FetchResponse.recordsOrFail(partitionData)
         val downConvertMagic =
-          logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
-            if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+          logConfig.map(_.recordVersion.value).flatMap { magic =>

Review comment:
       With IBP 3.0, this will always be 3.0, so it works as you said. Even though it's a bit roundabout. I could have the IBP check here, but it seems to spread the logic into more places. I could add a comment perhaps. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672253775



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Oh, interesting, so it does specify that a copy is returned. I had missed that. Since the mock config repository does not implement it correctly, as you found, I think it's better to handle this via a separate PR (it's possible that unrelated tests may fail as a result and may need fixing).

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Regarding unmodifiable `Properties`, there is no built-in way. My concern was something like the `MockConfigRepository` issue you found where copies are not done with the assumption that the caller should not mutate (even though there is no simple way to prevent that - another reason not to use `Properties`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672245627



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       Yeah, I do agree with you. I don't think that anyone already issues `3.0-IV1` anyway.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       For my own education, could `Properties` by unmodifiable somehow? 
   
   I pointed that out because the scaladoc of the `ConfigRepository` trait states the following `@return a copy of the configuration for the given resource`. It seems that the two main implementations respect this however `MockConfigRepository` does not. Therefore, I tend to agree with you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671609168



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         // which were written in the new format prior to the version downgrade.
         val unconvertedRecords = FetchResponse.recordsOrFail(partitionData)
         val downConvertMagic =
-          logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
-            if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+          logConfig.map(_.recordVersion.value).flatMap { magic =>

Review comment:
       I updated the comment to reflect this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672245627



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       Yeah, I do agree with you. I don't think that anyone already issues `3.0-IV1` anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672255739



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Regarding unmodifiable `Properties`, there is no built-in way. My concern was something like the `MockConfigRepository` issue you found where copies are not done with the assumption that the caller should not mutate (even though there is no simple way to prevent that - another reason not to use `Properties`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672250729



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       For my own education, could `Properties` by unmodifiable somehow? 
   
   I pointed that out because the scaladoc of the `ConfigRepository` trait states the following `@return a copy of the configuration for the given resource`. It seems that the two main implementations respect this however `MockConfigRepository` does not. Therefore, I tend to agree with you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882511896






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671566299



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         // which were written in the new format prior to the version downgrade.
         val unconvertedRecords = FetchResponse.recordsOrFail(partitionData)
         val downConvertMagic =
-          logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
-            if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+          logConfig.map(_.recordVersion.value).flatMap { magic =>

Review comment:
       With IBP 3.0, the message format version will always be 3.0 and the record version will always be 2, so it works as you said. Even though it's a bit roundabout. I could have the IBP check here, but it seems to spread the logic into more places. I could add a comment perhaps. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672253775



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Oh, interesting, so it does specify that a copy is returned. I had missed that. Since the mock config repository does not implement it correctly, as you found, I think it's better to handle this via a separate PR (it's possible that unrelated tests may fail as a result and may need fixing).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671609095



##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       I added a comment to `ApiVersion`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-881910077


   @hachikuji @dajac @mumrah I addressed the comments or explained why I didn't. The build looks good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#issuecomment-882544788


   Cherry-pick for 3.0: https://github.com/apache/kafka/pull/11078 (no re-review required, but running the PR build due to conflicts)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671401329



##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +

Review comment:
       Remove "if inter-broker" in the first sentence?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         // which were written in the new format prior to the version downgrade.
         val unconvertedRecords = FetchResponse.recordsOrFail(partitionData)
         val downConvertMagic =
-          logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
-            if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+          logConfig.map(_.recordVersion.value).flatMap { magic =>

Review comment:
       Hmm, I thought we were going to simplify this. Basically ignore the log config and just rely on the fetch version. The intuition is that _all_ new data will need to be down-converted anyway. Perhaps we want to wait until the IBP is at 3.0 for this logic to kick in?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +
+        "`inter.broker.protocol.version` is 3.0 or higher (the actual config value is ignored). Otherwise, the value should" +
+        "be a valid ApiVersion. Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a " +

Review comment:
       No need to fix it here since this has been here forever, but it's a little weird for the doc to mention a classname. Will users have any idea what that means?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +
+        "`inter.broker.protocol.version` is 3.0 or higher (the actual config value is ignored). Otherwise, the value should" +

Review comment:
       nit: missing space at the end

##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -270,7 +271,8 @@ object ConfigCommand extends Config {
     }
     if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
       println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
-        s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
+        "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +

Review comment:
       Should we also mention that the configuration is deprecated? Same question `TopicCommand` and the warning message in `LogConfig`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r672253775



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Oh, interesting, so it does specify that a copy is returned. I had missed that. Since the mock config repository does not implement it correctly, as you found, I think it's better to handle this via a separate PR (it's possible that unrelated tests may fail as a result and may need fixing).

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       Regarding unmodifiable `Properties`, there is no built-in way. My concern was something like the `MockConfigRepository` issue you found where copies are not done with the assumption that the caller should not mutate (even though there is no simple way to prevent that - another reason not to use `Properties`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671565792



##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +
+        "`inter.broker.protocol.version` is 3.0 or higher (the actual config value is ignored). Otherwise, the value should" +
+        "be a valid ApiVersion. Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a " +

Review comment:
       Good point, I can fix that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671387408



##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +

Review comment:
       Good point, it should be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r670557926



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -253,9 +259,10 @@ class LogManager(logDirs: Seq[File],
                            hadCleanShutdown: Boolean,
                            recoveryPoints: Map[TopicPartition, Long],
                            logStartOffsets: Map[TopicPartition, Long],
+                           defaultConfig: LogConfig,
                            topicConfigOverrides: Map[String, LogConfig]): Log = {
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigOverrides.getOrElse(topicPartition.topic, currentDefaultConfig)
+    val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)

Review comment:
       Regarding `currentDefaultConfig`, the answer is yes for `LogManager`. There is one case in `DynamicBrokerConfig` that could be handled better, I'll submit a separate PR for that (it's unrelated to the changes here).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org