You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/09/13 00:05:51 UTC

[kafka] branch 3.3 updated: KAFKA-14203 Disable snapshot generation on broker after metadata errors (#12596)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 89f7f31ac2 KAFKA-14203 Disable snapshot generation on broker after metadata errors (#12596)
89f7f31ac2 is described below

commit 89f7f31ac26751ab9323568fb08ae231c406ba46
Author: David Arthur <mu...@gmail.com>
AuthorDate: Mon Sep 12 17:40:29 2022 -0400

    KAFKA-14203 Disable snapshot generation on broker after metadata errors (#12596)
---
 .../server/metadata/BrokerMetadataListener.scala   | 34 ++++++++++--
 .../metadata/BrokerMetadataListenerTest.scala      | 60 +++++++++++++++++++++-
 2 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 3984f467ed..cf7bf5aed9 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
+import java.util.concurrent.atomic.AtomicBoolean
+
 
 object BrokerMetadataListener {
   val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs"
@@ -41,8 +43,22 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+      // If the broker has any kind of error handling metadata records or publishing a new image
+      // we will disable taking new snapshots in order to preserve the local metadata log. Once we
+      // encounter a metadata processing error, the broker will be in an undetermined state.
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }
+  }
+
   private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
   logIdent = logContext.logPrefix()
@@ -147,7 +163,9 @@ class BrokerMetadataListener(
 
   private def maybeStartSnapshot(): Unit = {
     snapshotter.foreach { snapshotter =>
-      if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+      if (metadataFaultOccurred.get()) {
+        trace("Not starting metadata snapshot since we previously had an error")
+      } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
         _bytesSinceLastSnapshot = 0L
       }
     }
@@ -307,11 +325,21 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
-    _image = _delta.apply()
+    try {
+      _image = _delta.apply()
+    } catch {
+      case t: Throwable =>
+        // If we cannot apply the delta, this publish event will throw and we will not publish a new image.
+        // The broker will continue applying metadata records and attempt to publish again.
+        throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+    }
+
     _delta = new MetadataDelta(_image)
     if (isDebugEnabled) {
       debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
     }
+
+    // This publish call is done with its own try-catch and fault handler
     publisher.publish(delta, _image)
 
     // Update the metrics since the publisher handled the lastest image
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 92e9fe9a2a..ec1ee66682 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.{Endpoint, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.MockFaultHandler
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, Test}
 
@@ -45,6 +45,7 @@ class BrokerMetadataListenerTest {
     metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
     snapshotter: Option[MetadataSnapshotter] = None,
     maxBytesBetweenSnapshots: Long = 1000000L,
+    faultHandler: FaultHandler = metadataLoadingFaultHandler
   ): BrokerMetadataListener = {
     new BrokerMetadataListener(
       brokerId = 0,
@@ -53,7 +54,7 @@ class BrokerMetadataListenerTest {
       maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
       snapshotter = snapshotter,
       brokerMetrics = metrics,
-      metadataLoadingFaultHandler = metadataLoadingFaultHandler
+      _metadataLoadingFaultHandler = faultHandler
     )
   }
 
@@ -168,6 +169,7 @@ class BrokerMetadataListenerTest {
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+  private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {
@@ -192,6 +194,27 @@ class BrokerMetadataListenerTest {
     listener.getImageRecords().get()
   }
 
+  private def generateBadRecords(listener: BrokerMetadataListener,
+                                endOffset: Long): Unit = {
+    listener.handleCommit(
+      RecordTestUtils.mockBatchReader(
+        endOffset,
+        0,
+        util.Arrays.asList(
+          new ApiMessageAndVersion(new PartitionChangeRecord().
+            setPartitionId(0).
+            setTopicId(BAR_ID).
+            setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+          new ApiMessageAndVersion(new PartitionChangeRecord().
+            setPartitionId(0).
+            setTopicId(BAR_ID).
+            setRemovingReplicas(Collections.emptyList()), 0.toShort)
+        )
+      )
+    )
+    listener.getImageRecords().get()
+  }
+
   @Test
   def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
     val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L)
@@ -289,6 +312,39 @@ class BrokerMetadataListenerTest {
     assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update")
   }
 
+  @Test
+  def testNoSnapshotAfterError(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val faultHandler = new MockFaultHandler("metadata loading")
+
+    val listener = newBrokerMetadataListener(
+      snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L,
+      faultHandler = faultHandler)
+    try {
+      val brokerIds = 0 to 3
+
+      registerBrokers(listener, brokerIds, endOffset = 100L)
+      createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
+      listener.getImageRecords().get()
+      assertEquals(200L, listener.highestMetadataOffset)
+      assertEquals(-1L, snapshotter.prevCommittedOffset)
+      assertEquals(-1L, snapshotter.activeSnapshotOffset)
+
+      // Append invalid records that will normally trigger a snapshot
+      generateBadRecords(listener, 1000L)
+      assertEquals(-1L, snapshotter.prevCommittedOffset)
+      assertEquals(-1L, snapshotter.activeSnapshotOffset)
+
+      // Generate some records that will not throw an error, verify still no snapshots
+      generateManyRecords(listener, 2000L)
+      assertEquals(-1L, snapshotter.prevCommittedOffset)
+      assertEquals(-1L, snapshotter.activeSnapshotOffset)
+    } finally {
+      listener.close()
+    }
+  }
+
   private def registerBrokers(
     listener: BrokerMetadataListener,
     brokerIds: Iterable[Int],