You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/07/12 15:02:52 UTC

[kafka] branch trunk updated: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating (#12265)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98726c2bac8 KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating (#12265)
98726c2bac8 is described below

commit 98726c2bac8bc221ec2df64b35ce15d0ee71893f
Author: dengziming <de...@gmail.com>
AuthorDate: Tue Jul 12 23:02:43 2022 +0800

    KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating (#12265)
    
    There are 3 bugs when a broker generates a snapshot.
    
    1. Broker should not generate snapshots until it starts publishing.
        Before a broker starts publishing, BrokerMetadataListener._publisher=None, so _publisher.foreach(publish) will do nothing, so featuresDelta.metadataVersionChange().isPresent is always true, so we will generating a snapshot on every commit since we believe metadata version has changed, here are the logs, note offset 1 is a LeaderChangeMessage so there is no snapshot:
    
    [2022-06-08 13:07:43,010] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 0... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
    [2022-06-08 13:07:43,222] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 2... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
    [2022-06-08 13:07:43,727] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 3... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
    [2022-06-08 13:07:44,228] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 4... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
    
    2. We should compute metadataVersionChanged before _publisher.foreach(publish)
        After _publisher.foreach(publish) the BrokerMetadataListener_delta is always Empty, so metadataVersionChanged is always false, this means we will never trigger snapshot generating even metadata version has changed.
    
    3. We should try to generate a snapshot when starting publishing
        When we started publishing, there may be a metadata version change, so we should try to generate a snapshot before first publishing.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Divij Vaidya <di...@amazon.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  9 +---
 .../server/metadata/BrokerMetadataListener.scala   | 39 +++++++++-----
 .../metadata/BrokerMetadataSnapshotter.scala       | 26 +++++----
 .../metadata/BrokerMetadataListenerTest.scala      | 61 ++++++++++++++++++++--
 .../metadata/BrokerMetadataSnapshotterTest.scala   |  6 +--
 5 files changed, 104 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d0d2a98b483..eb21c1ed25e 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -62,13 +62,8 @@ class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
     extends SnapshotWriterBuilder {
   override def build(committedOffset: Long,
                      committedEpoch: Int,
-                     lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
-    raftClient.createSnapshot(committedOffset, committedEpoch, lastContainedLogTime).
-        asScala.getOrElse(
-      throw new RuntimeException("A snapshot already exists with " +
-        s"committedOffset=$committedOffset, committedEpoch=$committedEpoch, " +
-        s"lastContainedLogTime=$lastContainedLogTime")
-    )
+                     lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]] = {
+    raftClient.createSnapshot(committedOffset, committedEpoch, lastContainedLogTime).asScala
   }
 }
 
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index fa0bc52d7aa..3b79526a954 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -117,26 +117,34 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _publisher.foreach(publish)
 
-      // If we detected a change in metadata.version, generate a local snapshot
-      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
-        featuresDelta.metadataVersionChange().isPresent
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      if (shouldSnapshot()) {
+        maybeStartSnapshot()
       }
 
-      snapshotter.foreach { snapshotter =>
-        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot() || metadataVersionChanged) {
-          if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
-            _bytesSinceLastSnapshot = 0L
-          }
-        }
-      }
+      _publisher.foreach(publish)
     }
   }
 
   private def shouldSnapshot(): Boolean = {
-    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+    (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged()
+  }
+
+  private def metadataVersionChanged(): Boolean = {
+    // The _publisher is empty before starting publishing, and we won't compute feature delta
+    // until we starting publishing
+    _publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta =>
+      featuresDelta.metadataVersionChange().isPresent
+    }
+  }
+
+  private def maybeStartSnapshot(): Unit = {
+    snapshotter.foreach { snapshotter =>
+      if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+        _bytesSinceLastSnapshot = 0L
+      }
+    }
   }
 
   /**
@@ -213,7 +221,7 @@ class BrokerMetadataListener(
             s" ${messageAndVersion.message}")
         }
 
-        _highestOffset  = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
+        _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
 
         delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
         numRecords += 1
@@ -244,6 +252,9 @@ class BrokerMetadataListener(
       _publisher = Some(publisher)
       log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
       try {
+        if (metadataVersionChanged()) {
+          maybeStartSnapshot()
+        }
         publish(publisher)
         future.complete(null)
       } catch {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index b5179c32f14..dd77b277c8b 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.snapshot.SnapshotWriter
 trait SnapshotWriterBuilder {
   def build(committedOffset: Long,
             committedEpoch: Int,
-            lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+            lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]]
 }
 
 class BrokerMetadataSnapshotter(
@@ -51,20 +51,26 @@ class BrokerMetadataSnapshotter(
   val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
 
   override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized {
-    if (_currentSnapshotOffset == -1L) {
+    if (_currentSnapshotOffset != -1) {
+      info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
+        s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
+      false
+    } else {
       val writer = writerBuilder.build(
         image.highestOffsetAndEpoch().offset,
         image.highestOffsetAndEpoch().epoch,
         lastContainedLogTime
       )
-      _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
-      info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
-      eventQueue.append(new CreateSnapshotEvent(image, writer))
-      true
-    } else {
-      warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
-           s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
-      false
+      if (writer.nonEmpty) {
+        _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
+        info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
+        eventQueue.append(new CreateSnapshotEvent(image, writer.get))
+        true
+      } else {
+        info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
+          s"there is already a snapshot at offset ${image.highestOffsetAndEpoch().offset}")
+        false
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index f75823a0292..6de448f2802 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -20,14 +20,13 @@ package kafka.server.metadata
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Optional}
-
-import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
+import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Endpoint, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
@@ -240,6 +239,43 @@ class BrokerMetadataListenerTest {
     }
   }
 
+  @Test
+  def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L)
+    listener.getImageRecords().get()
+    assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot on metadata version change before starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    val endOffset = 100L
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), endOffset)
+    listener.startPublishing(new MockMetadataPublisher()).get()
+    assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChange(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+    listener.startPublishing(new MockMetadataPublisher()).get()
+
+    val endOffset = 100L
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, (MetadataVersion.latest().featureLevel() - 1).toShort, endOffset)
+    // Waiting for the metadata version update to get processed
+    listener.getImageRecords().get()
+    assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update")
+  }
+
   private def registerBrokers(
     listener: BrokerMetadataListener,
     brokerIds: Iterable[Int],
@@ -285,4 +321,23 @@ class BrokerMetadataListenerTest {
     )
   }
 
+  private def updateFeature(
+    listener: BrokerMetadataListener,
+    feature: String,
+    version: Short,
+    endOffset: Long
+  ): Unit = {
+    listener.handleCommit(
+      RecordTestUtils.mockBatchReader(
+        endOffset,
+        0,
+        util.Arrays.asList(
+          new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(feature).
+            setFeatureLevel(version), 0.toShort)
+        )
+      )
+    )
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index 82426611425..e6702ee287f 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -20,7 +20,6 @@ package kafka.server.metadata
 import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CompletableFuture, CountDownLatch}
-
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.protocol.ByteBufferAccessor
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
@@ -34,6 +33,7 @@ import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter,
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
 
+import scala.compat.java8.OptionConverters._
 
 class BrokerMetadataSnapshotterTest {
   @Test
@@ -48,7 +48,7 @@ class BrokerMetadataSnapshotterTest {
 
     override def build(committedOffset: Long,
                        committedEpoch: Int,
-                       lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
+                       lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]] = {
       val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
       RecordsSnapshotWriter.createWithHeader(
         () => {
@@ -62,7 +62,7 @@ class BrokerMetadataSnapshotterTest {
         lastContainedLogTime,
         CompressionType.NONE,
         MetadataRecordSerde.INSTANCE
-      ).get();
+      ).asScala
     }
 
     def consumeSnapshotBuffer(committedOffset: Long, committedEpoch: Int)(buffer: ByteBuffer): Unit = {