You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 20:24:37 UTC

[kafka] branch trunk updated: MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e67711af71a MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
e67711af71a is described below

commit e67711af71a3d21e43487933f05406c320646474
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue Aug 9 13:24:24 2022 -0700

    MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
    
    BrokerMetadataSnapshotter should split up record lists that exceed the batch size.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../metadata/BrokerMetadataSnapshotter.scala       | 36 +++++++++++++++++++++-
 .../metadata/BrokerMetadataSnapshotterTest.scala   | 30 ++++++++++++++++++
 .../kafka/controller/SnapshotGeneratorTest.java    |  3 +-
 3 files changed, 67 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index dd77b277c8b..2a236ca7497 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -24,18 +24,50 @@ import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.snapshot.SnapshotWriter
 
+import java.util.function.Consumer
+
 trait SnapshotWriterBuilder {
   def build(committedOffset: Long,
             committedEpoch: Int,
             lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]]
 }
 
+/**
+ * The RecordListConsumer takes as input a potentially long list of records, and feeds the
+ * SnapshotWriter a series of smaller lists of records.
+ *
+ * Note: from the perspective of Kafka, the snapshot file is really just a list of records,
+ * and we don't care about batches. Batching is irrelevant to the meaning of the snapshot.
+ */
+class RecordListConsumer(
+  val maxRecordsInBatch: Int,
+  val writer: SnapshotWriter[ApiMessageAndVersion]
+) extends Consumer[java.util.List[ApiMessageAndVersion]] {
+  override def accept(messages: java.util.List[ApiMessageAndVersion]): Unit = {
+    var i = 0
+    while (i < messages.size()) {
+      writer.append(messages.subList(i, Math.min(i + maxRecordsInBatch, messages.size())));
+      i += maxRecordsInBatch
+    }
+  }
+}
+
 class BrokerMetadataSnapshotter(
   brokerId: Int,
   val time: Time,
   threadNamePrefix: Option[String],
   writerBuilder: SnapshotWriterBuilder
 ) extends Logging with MetadataSnapshotter {
+  /**
+   * The maximum number of records we will put in each batch.
+   *
+   * From the perspective of the Raft layer, the limit on batch size is specified in terms of
+   * bytes, not number of records. @See {@link KafkaRaftClient#MAX_BATCH_SIZE_BYTES} for details.
+   * However, it's more convenient to limit the batch size here in terms of number of records.
+   * So we chose a low number that will not cause problems.
+   */
+  private val maxRecordsInBatch = 1024
+
   private val logContext = new LogContext(s"[BrokerMetadataSnapshotter id=$brokerId] ")
   logIdent = logContext.logPrefix()
 
@@ -77,9 +109,11 @@ class BrokerMetadataSnapshotter(
   class CreateSnapshotEvent(image: MetadataImage,
                             writer: SnapshotWriter[ApiMessageAndVersion])
         extends EventQueue.Event {
+
     override def run(): Unit = {
       try {
-        image.write(writer.append(_))
+        val consumer = new RecordListConsumer(maxRecordsInBatch, writer)
+        image.write(consumer)
         writer.freeze()
       } finally {
         try {
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index e6702ee287f..ff2326c92fa 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CompletableFuture, CountDownLatch}
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.metadata.FenceBrokerRecord
 import org.apache.kafka.common.protocol.ByteBufferAccessor
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
 import org.apache.kafka.common.utils.Time
@@ -33,6 +34,8 @@ import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter,
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
 
+import java.util
+import java.util.Arrays.asList
 import scala.compat.java8.OptionConverters._
 
 class BrokerMetadataSnapshotterTest {
@@ -104,4 +107,31 @@ class BrokerMetadataSnapshotterTest {
       snapshotter.close()
     }
   }
+
+  class MockSnapshotWriter extends SnapshotWriter[ApiMessageAndVersion] {
+    val batches = new util.ArrayList[util.List[ApiMessageAndVersion]]
+    override def snapshotId(): OffsetAndEpoch = new OffsetAndEpoch(0, 0)
+    override def lastContainedLogOffset(): Long = 0
+    override def lastContainedLogEpoch(): Int = 0
+    override def isFrozen: Boolean = false
+    override def append(batch: util.List[ApiMessageAndVersion]): Unit = batches.add(batch)
+    override def freeze(): Unit = {}
+    override def close(): Unit = {}
+  }
+
+  @Test
+  def testRecordListConsumer(): Unit = {
+    val writer = new MockSnapshotWriter()
+    val consumer = new RecordListConsumer(3, writer)
+    val m = new ApiMessageAndVersion(new FenceBrokerRecord().setId(1).setEpoch(1), 0.toShort)
+    consumer.accept(asList(m, m))
+    assertEquals(asList(asList(m, m)), writer.batches)
+    consumer.accept(asList(m))
+    assertEquals(asList(asList(m, m), asList(m)), writer.batches)
+    consumer.accept(asList(m, m, m, m))
+    assertEquals(asList(asList(m, m), asList(m), asList(m, m, m), asList(m)), writer.batches)
+    consumer.accept(asList(m, m, m, m, m, m, m, m))
+    assertEquals(asList(asList(m, m), asList(m), asList(m, m, m), asList(m), asList(m, m, m), asList(m, m, m), asList(m, m)),
+      writer.batches)
+  }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
index 2c61dbcdc74..f7fa18f20a4 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.OptionalLong;
 import java.util.Optional;
 
+import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -94,7 +95,7 @@ public class SnapshotGeneratorTest {
     ) {
         return RecordsSnapshotWriter.createWithHeader(
             () -> createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, 1)),
-            1024,
+            MAX_BATCH_SIZE_BYTES,
             MemoryPool.NONE,
             new MockTime(),
             lastContainedLogTime,