You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 20:24:37 UTC
[kafka] branch trunk updated: MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e67711af71a MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
e67711af71a is described below
commit e67711af71a3d21e43487933f05406c320646474
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue Aug 9 13:24:24 2022 -0700
MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
BrokerMetadataSnapshotter should split up record lists that exceed the batch size.
Reviewers: David Arthur <mu...@gmail.com>
---
.../metadata/BrokerMetadataSnapshotter.scala | 36 +++++++++++++++++++++-
.../metadata/BrokerMetadataSnapshotterTest.scala | 30 ++++++++++++++++++
.../kafka/controller/SnapshotGeneratorTest.java | 3 +-
3 files changed, 67 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index dd77b277c8b..2a236ca7497 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -24,18 +24,50 @@ import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter
+import java.util.function.Consumer
+
trait SnapshotWriterBuilder {
def build(committedOffset: Long,
committedEpoch: Int,
lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]]
}
+/**
+ * The RecordListConsumer takes as input a potentially long list of records, and feeds the
+ * SnapshotWriter a series of smaller lists of records.
+ *
+ * Note: from the perspective of Kafka, the snapshot file is really just a list of records,
+ * and we don't care about batches. Batching is irrelevant to the meaning of the snapshot.
+ */
+class RecordListConsumer(
+ val maxRecordsInBatch: Int,
+ val writer: SnapshotWriter[ApiMessageAndVersion]
+) extends Consumer[java.util.List[ApiMessageAndVersion]] {
+ override def accept(messages: java.util.List[ApiMessageAndVersion]): Unit = {
+ var i = 0
+ while (i < messages.size()) {
+ writer.append(messages.subList(i, Math.min(i + maxRecordsInBatch, messages.size())));
+ i += maxRecordsInBatch
+ }
+ }
+}
+
class BrokerMetadataSnapshotter(
brokerId: Int,
val time: Time,
threadNamePrefix: Option[String],
writerBuilder: SnapshotWriterBuilder
) extends Logging with MetadataSnapshotter {
+ /**
+ * The maximum number of records we will put in each batch.
+ *
+ * From the perspective of the Raft layer, the limit on batch size is specified in terms of
+ * bytes, not number of records. @See {@link KafkaRaftClient#MAX_BATCH_SIZE_BYTES} for details.
+ * However, it's more convenient to limit the batch size here in terms of number of records.
+ * So we chose a low number that will not cause problems.
+ */
+ private val maxRecordsInBatch = 1024
+
private val logContext = new LogContext(s"[BrokerMetadataSnapshotter id=$brokerId] ")
logIdent = logContext.logPrefix()
@@ -77,9 +109,11 @@ class BrokerMetadataSnapshotter(
class CreateSnapshotEvent(image: MetadataImage,
writer: SnapshotWriter[ApiMessageAndVersion])
extends EventQueue.Event {
+
override def run(): Unit = {
try {
- image.write(writer.append(_))
+ val consumer = new RecordListConsumer(maxRecordsInBatch, writer)
+ image.write(consumer)
writer.freeze()
} finally {
try {
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index e6702ee287f..ff2326c92fa 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CompletableFuture, CountDownLatch}
import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.metadata.FenceBrokerRecord
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
import org.apache.kafka.common.utils.Time
@@ -33,6 +34,8 @@ import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter,
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
+import java.util
+import java.util.Arrays.asList
import scala.compat.java8.OptionConverters._
class BrokerMetadataSnapshotterTest {
@@ -104,4 +107,31 @@ class BrokerMetadataSnapshotterTest {
snapshotter.close()
}
}
+
+ class MockSnapshotWriter extends SnapshotWriter[ApiMessageAndVersion] {
+ val batches = new util.ArrayList[util.List[ApiMessageAndVersion]]
+ override def snapshotId(): OffsetAndEpoch = new OffsetAndEpoch(0, 0)
+ override def lastContainedLogOffset(): Long = 0
+ override def lastContainedLogEpoch(): Int = 0
+ override def isFrozen: Boolean = false
+ override def append(batch: util.List[ApiMessageAndVersion]): Unit = batches.add(batch)
+ override def freeze(): Unit = {}
+ override def close(): Unit = {}
+ }
+
+ @Test
+ def testRecordListConsumer(): Unit = {
+ val writer = new MockSnapshotWriter()
+ val consumer = new RecordListConsumer(3, writer)
+ val m = new ApiMessageAndVersion(new FenceBrokerRecord().setId(1).setEpoch(1), 0.toShort)
+ consumer.accept(asList(m, m))
+ assertEquals(asList(asList(m, m)), writer.batches)
+ consumer.accept(asList(m))
+ assertEquals(asList(asList(m, m), asList(m)), writer.batches)
+ consumer.accept(asList(m, m, m, m))
+ assertEquals(asList(asList(m, m), asList(m), asList(m, m, m), asList(m)), writer.batches)
+ consumer.accept(asList(m, m, m, m, m, m, m, m))
+ assertEquals(asList(asList(m, m), asList(m), asList(m, m, m), asList(m), asList(m, m, m), asList(m, m, m), asList(m, m)),
+ writer.batches)
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
index 2c61dbcdc74..f7fa18f20a4 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.OptionalLong;
import java.util.Optional;
+import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -94,7 +95,7 @@ public class SnapshotGeneratorTest {
) {
return RecordsSnapshotWriter.createWithHeader(
() -> createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, 1)),
- 1024,
+ MAX_BATCH_SIZE_BYTES,
MemoryPool.NONE,
new MockTime(),
lastContainedLogTime,