You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/07/12 15:02:52 UTC
[kafka] branch trunk updated: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating (#12265)
This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98726c2bac8 KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating (#12265)
98726c2bac8 is described below
commit 98726c2bac8bc221ec2df64b35ce15d0ee71893f
Author: dengziming <de...@gmail.com>
AuthorDate: Tue Jul 12 23:02:43 2022 +0800
KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating (#12265)
There are 3 bugs when a broker generates a snapshot.
1. Broker should not generate snapshots until it starts publishing.
Before a broker starts publishing, BrokerMetadataListener._publisher=None, so _publisher.foreach(publish) will do nothing, so featuresDelta.metadataVersionChange().isPresent is always true, so we will generating a snapshot on every commit since we believe metadata version has changed, here are the logs, note offset 1 is a LeaderChangeMessage so there is no snapshot:
[2022-06-08 13:07:43,010] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 0... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
[2022-06-08 13:07:43,222] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 2... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
[2022-06-08 13:07:43,727] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 3... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
[2022-06-08 13:07:44,228] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 4... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
2. We should compute metadataVersionChanged before _publisher.foreach(publish)
After _publisher.foreach(publish) the BrokerMetadataListener_delta is always Empty, so metadataVersionChanged is always false, this means we will never trigger snapshot generating even metadata version has changed.
3. We should try to generate a snapshot when starting publishing
When we started publishing, there may be a metadata version change, so we should try to generate a snapshot before first publishing.
Reviewers: Jason Gustafson <ja...@confluent.io>, Divij Vaidya <di...@amazon.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 9 +---
.../server/metadata/BrokerMetadataListener.scala | 39 +++++++++-----
.../metadata/BrokerMetadataSnapshotter.scala | 26 +++++----
.../metadata/BrokerMetadataListenerTest.scala | 61 ++++++++++++++++++++--
.../metadata/BrokerMetadataSnapshotterTest.scala | 6 +--
5 files changed, 104 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d0d2a98b483..eb21c1ed25e 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -62,13 +62,8 @@ class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
extends SnapshotWriterBuilder {
override def build(committedOffset: Long,
committedEpoch: Int,
- lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
- raftClient.createSnapshot(committedOffset, committedEpoch, lastContainedLogTime).
- asScala.getOrElse(
- throw new RuntimeException("A snapshot already exists with " +
- s"committedOffset=$committedOffset, committedEpoch=$committedEpoch, " +
- s"lastContainedLogTime=$lastContainedLogTime")
- )
+ lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]] = {
+ raftClient.createSnapshot(committedOffset, committedEpoch, lastContainedLogTime).asScala
}
}
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index fa0bc52d7aa..3b79526a954 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -117,26 +117,34 @@ class BrokerMetadataListener(
} finally {
reader.close()
}
- _publisher.foreach(publish)
- // If we detected a change in metadata.version, generate a local snapshot
- val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
- featuresDelta.metadataVersionChange().isPresent
+ _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+ if (shouldSnapshot()) {
+ maybeStartSnapshot()
}
- snapshotter.foreach { snapshotter =>
- _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
- if (shouldSnapshot() || metadataVersionChanged) {
- if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
- _bytesSinceLastSnapshot = 0L
- }
- }
- }
+ _publisher.foreach(publish)
}
}
private def shouldSnapshot(): Boolean = {
- _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+ (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged()
+ }
+
+ private def metadataVersionChanged(): Boolean = {
+ // The _publisher is empty before starting publishing, and we won't compute feature delta
+ // until we starting publishing
+ _publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta =>
+ featuresDelta.metadataVersionChange().isPresent
+ }
+ }
+
+ private def maybeStartSnapshot(): Unit = {
+ snapshotter.foreach { snapshotter =>
+ if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+ _bytesSinceLastSnapshot = 0L
+ }
+ }
}
/**
@@ -213,7 +221,7 @@ class BrokerMetadataListener(
s" ${messageAndVersion.message}")
}
- _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
+ _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
numRecords += 1
@@ -244,6 +252,9 @@ class BrokerMetadataListener(
_publisher = Some(publisher)
log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
try {
+ if (metadataVersionChanged()) {
+ maybeStartSnapshot()
+ }
publish(publisher)
future.complete(null)
} catch {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index b5179c32f14..dd77b277c8b 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.snapshot.SnapshotWriter
trait SnapshotWriterBuilder {
def build(committedOffset: Long,
committedEpoch: Int,
- lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+ lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]]
}
class BrokerMetadataSnapshotter(
@@ -51,20 +51,26 @@ class BrokerMetadataSnapshotter(
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized {
- if (_currentSnapshotOffset == -1L) {
+ if (_currentSnapshotOffset != -1) {
+ info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
+ s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
+ false
+ } else {
val writer = writerBuilder.build(
image.highestOffsetAndEpoch().offset,
image.highestOffsetAndEpoch().epoch,
lastContainedLogTime
)
- _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
- info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
- eventQueue.append(new CreateSnapshotEvent(image, writer))
- true
- } else {
- warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
- s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
- false
+ if (writer.nonEmpty) {
+ _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
+ info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
+ eventQueue.append(new CreateSnapshotEvent(image, writer.get))
+ true
+ } else {
+ info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
+ s"there is already a snapshot at offset ${image.highestOffsetAndEpoch().offset}")
+ false
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index f75823a0292..6de448f2802 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -20,14 +20,13 @@ package kafka.server.metadata
import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Optional}
-
-import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
+import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
@@ -240,6 +239,43 @@ class BrokerMetadataListenerTest {
}
}
+ @Test
+ def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = {
+ val snapshotter = new MockMetadataSnapshotter()
+ val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+ maxBytesBetweenSnapshots = 1000L)
+
+ updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L)
+ listener.getImageRecords().get()
+ assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot on metadata version change before starting publishing")
+ }
+
+ @Test
+ def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = {
+ val snapshotter = new MockMetadataSnapshotter()
+ val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+ maxBytesBetweenSnapshots = 1000L)
+
+ val endOffset = 100L
+ updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), endOffset)
+ listener.startPublishing(new MockMetadataPublisher()).get()
+ assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing")
+ }
+
+ @Test
+ def testSnapshotAfterMetadataVersionChange(): Unit = {
+ val snapshotter = new MockMetadataSnapshotter()
+ val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+ maxBytesBetweenSnapshots = 1000L)
+ listener.startPublishing(new MockMetadataPublisher()).get()
+
+ val endOffset = 100L
+ updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, (MetadataVersion.latest().featureLevel() - 1).toShort, endOffset)
+ // Waiting for the metadata version update to get processed
+ listener.getImageRecords().get()
+ assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update")
+ }
+
private def registerBrokers(
listener: BrokerMetadataListener,
brokerIds: Iterable[Int],
@@ -285,4 +321,23 @@ class BrokerMetadataListenerTest {
)
}
+ private def updateFeature(
+ listener: BrokerMetadataListener,
+ feature: String,
+ version: Short,
+ endOffset: Long
+ ): Unit = {
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ endOffset,
+ 0,
+ util.Arrays.asList(
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(feature).
+ setFeatureLevel(version), 0.toShort)
+ )
+ )
+ )
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index 82426611425..e6702ee287f 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -20,7 +20,6 @@ package kafka.server.metadata
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CompletableFuture, CountDownLatch}
-
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
@@ -34,6 +33,7 @@ import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter,
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
+import scala.compat.java8.OptionConverters._
class BrokerMetadataSnapshotterTest {
@Test
@@ -48,7 +48,7 @@ class BrokerMetadataSnapshotterTest {
override def build(committedOffset: Long,
committedEpoch: Int,
- lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
+ lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]] = {
val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
RecordsSnapshotWriter.createWithHeader(
() => {
@@ -62,7 +62,7 @@ class BrokerMetadataSnapshotterTest {
lastContainedLogTime,
CompressionType.NONE,
MetadataRecordSerde.INSTANCE
- ).get();
+ ).asScala
}
def consumeSnapshotBuffer(committedOffset: Long, committedEpoch: Int)(buffer: ByteBuffer): Unit = {