You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/16 16:30:42 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11036: KAFKA-12944: Assume message format version is 3.0 when inter-broker protocol is 3.0 or higher (KIP-724)

dajac commented on a change in pull request #11036:
URL: https://github.com/apache/kafka/pull/11036#discussion_r671370260



##########
File path: core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
##########
@@ -103,4 +109,38 @@ class AbstractPartitionTest {
       TestUtils.clearYammerMetrics()
     }
   }
+
+  protected def setupPartitionWithMocks(leaderEpoch: Int,
+                                      isLeader: Boolean): Partition = {

Review comment:
       nit: Alignement.

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -152,7 +149,7 @@ class LocalLogTest {
     val oldConfig = log.config
     assertEquals(oldConfig, log.config)
 
-    val newConfig = createLogConfig(segmentBytes=oldConfig.segmentSize + 1)
+    val newConfig = LogTestUtils.createLogConfig(segmentBytes=oldConfig.segmentSize + 1)

Review comment:
       nit: While here, could we add a space before and after `=`?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,27 +411,49 @@ class LogManager(logDirs: Seq[File],
    *  Start the background threads to flush logs and do log cleanup
    */
   def startup(topicNames: Set[String]): Unit = {
-    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+    // ensure consistency between default config and overrides
+    val defaultConfig = currentDefaultConfig
+    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
   // visible for testing
-  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+  @nowarn("cat=deprecation")
+  private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
     val topicConfigOverrides = mutable.Map[String, LogConfig]()
-    val defaultProps = currentDefaultConfig.originals()
+    val defaultProps = defaultConfig.originals()
     topicNames.foreach { topicName =>
-      val overrides = configRepository.topicConfig(topicName)
+      var overrides = configRepository.topicConfig(topicName)

Review comment:
       It seems that we get a copy of the config here so I suppose that we could directly mutate it below instead of making a copy again. What do you think?

##########
File path: core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
##########
@@ -163,6 +163,25 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
     records
   }
 
+
+  /**
+   * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
+   * records to each partition
+   */
+  protected def createTopicAndSendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                topicName: String,

Review comment:
       nit: Could we align the arguments?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -171,12 +171,24 @@
     public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
         "creating a new log segment.";
 
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
     public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
-    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
-        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
-        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
-        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
-        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+
+    /**
+     * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
+     * for most situations.
+     */
+    @Deprecated
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
+        "will use to append messages to the logs if inter-broker. The value of this config is always assumed to be `3.0` if " +

Review comment:
       It seems that either the sentence is not finished or `if inter-broker` is not necessary.

##########
File path: core/src/main/scala/kafka/log/LogConfig.scala
##########
@@ -479,4 +501,31 @@ object LogConfig {
     logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
+
+  def shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion: ApiVersion): Boolean =
+    interBrokerProtocolVersion >= KAFKA_3_0_IV1

Review comment:
       I wonder if we should introduce `KAFKA_3_0_IV2` to be safe here. I don't think anyone already uses `KAFKA_3_0_IV1` but what would happen if one does? 

##########
File path: core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+
+import java.util
+import java.util.{Optional, Properties}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class BaseFetchRequestTest extends BaseRequestTest {
+
+  protected var producer: KafkaProducer[String, String] = null
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.FetchMaxBytes, Int.MaxValue.toString)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  protected def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long],
+                                 version: Short): FetchRequest = {
+    val topicIds = getTopicIds().asJava
+    FetchRequest.Builder.forConsumer(version, Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap), topicIds)
+      .setMaxBytes(maxResponseBytes).build()
+  }
+
+  protected def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {

Review comment:
       nit: Alignment.

##########
File path: core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+
+import java.util
+import java.util.{Optional, Properties}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class BaseFetchRequestTest extends BaseRequestTest {
+
+  protected var producer: KafkaProducer[String, String] = null
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.FetchMaxBytes, Int.MaxValue.toString)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  protected def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long],
+                                 version: Short): FetchRequest = {

Review comment:
       nit: Alignment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org