You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/04 19:50:42 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

mumrah commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587767322



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(

Review comment:
       👍  Makes sense to move this here since KafkaMetadataLog fully owns the Log's lifecycle

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(
+      dir = dataDir,
+      config = defaultLogConfig,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = scheduler,
+      brokerTopicStats = new BrokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = Int.MaxValue,
+      producerIdExpirationCheckIntervalMs = Int.MaxValue,
+      logDirFailureChannel = new LogDirFailureChannel(5),
+      keepPartitionMetadataFile = false
+    )
+
+    KafkaMetadataLog(log, topicPartition, maxFetchSizeInBytes)
+  }
+
+  private def apply(

Review comment:
       Do we still need this additional private factory? Is it used by tests or something? Looks like it's just called from a few lines above, maybe we can just combine the two?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -310,44 +288,109 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
     assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64

Review comment:
       Does it matter that maxBatchSizeInBytes is a multiple of the record size? Could you have done 
   
   > buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes + 1) 
   
   to get the same RecordTooLargeException?

##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -220,25 +220,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildMetadataLog(): KafkaMetadataLog = {
-    val defaultProps = LogConfig.extractLogConfigMap(config)

Review comment:
       I see we are no longer calling this, but rather explicitly populating the log properties. Are there other log configs we might want to expose? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org