You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/04 04:49:30 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10256: MINOR: Raft max batch size needs to propagate to log config

hachikuji opened a new pull request #10256:
URL: https://github.com/apache/kafka/pull/10256


   We increased the max batch size to 8MB in https://github.com/apache/kafka/pull/10063, but we forgot to propagate the change to the `Log` instance which does its own validation. I had actually noted it in this comment https://github.com/apache/kafka/pull/10063#pullrequestreview-591715059, but I forgot about it afterwards.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587774537



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(
+      dir = dataDir,
+      config = defaultLogConfig,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = scheduler,
+      brokerTopicStats = new BrokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = Int.MaxValue,
+      producerIdExpirationCheckIntervalMs = Int.MaxValue,
+      logDirFailureChannel = new LogDirFailureChannel(5),
+      keepPartitionMetadataFile = false
+    )
+
+    KafkaMetadataLog(log, topicPartition, maxFetchSizeInBytes)
+  }
+
+  private def apply(

Review comment:
       Yeah, I got a little lazy here. Let me do the snapshot stuff in a helper method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587777995



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -310,44 +288,109 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
     assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64

Review comment:
       It's a little tricky to get the alignment perfect after accounting for the overhead of the batch itself. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587809203



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -310,44 +288,109 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
     assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64

Review comment:
       I tried it out just to see if we got lucky on the alignment, but unfortunately we didn't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10256:
URL: https://github.com/apache/kafka/pull/10256


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587835595



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -220,25 +220,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildMetadataLog(): KafkaMetadataLog = {
-    val defaultProps = LogConfig.extractLogConfigMap(config)

Review comment:
       I filed this JIRA: https://issues.apache.org/jira/browse/KAFKA-12423.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#issuecomment-790833577


   @ijuma Yes, probably. I'm working on it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587767322



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(

Review comment:
       👍  Makes sense to move this here since KafkaMetadataLog fully owns the Log's lifecycle

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(
+      dir = dataDir,
+      config = defaultLogConfig,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = scheduler,
+      brokerTopicStats = new BrokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = Int.MaxValue,
+      producerIdExpirationCheckIntervalMs = Int.MaxValue,
+      logDirFailureChannel = new LogDirFailureChannel(5),
+      keepPartitionMetadataFile = false
+    )
+
+    KafkaMetadataLog(log, topicPartition, maxFetchSizeInBytes)
+  }
+
+  private def apply(

Review comment:
       Do we still need this additional private factory? Is it used by tests or something? Looks like it's just called from a few lines above, maybe we can just combine the two?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -310,44 +288,109 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
     assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64

Review comment:
       Does it matter that maxBatchSizeInBytes is a multiple of the record size? Could you have done 
   
   > buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes + 1) 
   
   to get the same RecordTooLargeException?

##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -220,25 +220,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildMetadataLog(): KafkaMetadataLog = {
-    val defaultProps = LogConfig.extractLogConfigMap(config)

Review comment:
       I see we are no longer calling this, but rather explicitly populating the log properties. Are there other log configs we might want to expose? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587803031



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -220,25 +220,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildMetadataLog(): KafkaMetadataLog = {
-    val defaultProps = LogConfig.extractLogConfigMap(config)

Review comment:
       It seemed best for now to not allow overrides since this is a new usage of `Log` and we haven't had time to understand the impact of all of the configurations for this usage. Most of them are probably safe, but others do not even make sense (e.g. the retention settings). Let me open a JIRA so that we can consider which configs we want to expose for the metadata log and how we want to expose them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#issuecomment-790289897


   Do we need a test?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10256: MINOR: Raft max batch size needs to propagate to log config

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587822370



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -310,44 +288,109 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
     assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64

Review comment:
       Thanks for the explanation 👍  I figured it had something to do with the size overhead




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org