You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/09/13 00:05:51 UTC
[kafka] branch 3.3 updated: KAFKA-14203 Disable snapshot generation on broker after metadata errors (#12596)
This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 89f7f31ac2 KAFKA-14203 Disable snapshot generation on broker after metadata errors (#12596)
89f7f31ac2 is described below
commit 89f7f31ac26751ab9323568fb08ae231c406ba46
Author: David Arthur <mu...@gmail.com>
AuthorDate: Mon Sep 12 17:40:29 2022 -0400
KAFKA-14203 Disable snapshot generation on broker after metadata errors (#12596)
---
.../server/metadata/BrokerMetadataListener.scala | 34 ++++++++++--
.../metadata/BrokerMetadataListenerTest.scala | 60 +++++++++++++++++++++-
2 files changed, 89 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 3984f467ed..cf7bf5aed9 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.snapshot.SnapshotReader
+import java.util.concurrent.atomic.AtomicBoolean
+
object BrokerMetadataListener {
val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs"
@@ -41,8 +43,22 @@ class BrokerMetadataListener(
val maxBytesBetweenSnapshots: Long,
val snapshotter: Option[MetadataSnapshotter],
brokerMetrics: BrokerServerMetrics,
- metadataLoadingFaultHandler: FaultHandler
+ _metadataLoadingFaultHandler: FaultHandler
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+ private val metadataFaultOccurred = new AtomicBoolean(false)
+ private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+ override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
+ // If the broker has any kind of error handling metadata records or publishing a new image
+ // we will disable taking new snapshots in order to preserve the local metadata log. Once we
+ // encounter a metadata processing error, the broker will be in an undetermined state.
+ if (metadataFaultOccurred.compareAndSet(false, true)) {
+ error("Disabling metadata snapshots until this broker is restarted.")
+ }
+ _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+ }
+ }
+
private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
@@ -147,7 +163,9 @@ class BrokerMetadataListener(
private def maybeStartSnapshot(): Unit = {
snapshotter.foreach { snapshotter =>
- if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+ if (metadataFaultOccurred.get()) {
+ trace("Not starting metadata snapshot since we previously had an error")
+ } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}
}
@@ -307,11 +325,21 @@ class BrokerMetadataListener(
private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
- _image = _delta.apply()
+ try {
+ _image = _delta.apply()
+ } catch {
+ case t: Throwable =>
+ // If we cannot apply the delta, this publish event will throw and we will not publish a new image.
+ // The broker will continue applying metadata records and attempt to publish again.
+ throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
+ }
+
_delta = new MetadataDelta(_image)
if (isDebugEnabled) {
debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
}
+
+ // This publish call is done with its own try-catch and fault handler
publisher.publish(delta, _image)
// Update the metrics since the publisher handled the lastest image
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 92e9fe9a2a..ec1ee66682 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.MockFaultHandler
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
@@ -45,6 +45,7 @@ class BrokerMetadataListenerTest {
metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
snapshotter: Option[MetadataSnapshotter] = None,
maxBytesBetweenSnapshots: Long = 1000000L,
+ faultHandler: FaultHandler = metadataLoadingFaultHandler
): BrokerMetadataListener = {
new BrokerMetadataListener(
brokerId = 0,
@@ -53,7 +54,7 @@ class BrokerMetadataListenerTest {
maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
snapshotter = snapshotter,
brokerMetrics = metrics,
- metadataLoadingFaultHandler = metadataLoadingFaultHandler
+ _metadataLoadingFaultHandler = faultHandler
)
}
@@ -168,6 +169,7 @@ class BrokerMetadataListenerTest {
}
private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+ private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
private def generateManyRecords(listener: BrokerMetadataListener,
endOffset: Long): Unit = {
@@ -192,6 +194,27 @@ class BrokerMetadataListenerTest {
listener.getImageRecords().get()
}
+ private def generateBadRecords(listener: BrokerMetadataListener,
+ endOffset: Long): Unit = {
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ endOffset,
+ 0,
+ util.Arrays.asList(
+ new ApiMessageAndVersion(new PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(BAR_ID).
+ setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+ new ApiMessageAndVersion(new PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(BAR_ID).
+ setRemovingReplicas(Collections.emptyList()), 0.toShort)
+ )
+ )
+ )
+ listener.getImageRecords().get()
+ }
+
@Test
def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L)
@@ -289,6 +312,39 @@ class BrokerMetadataListenerTest {
assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should generate snapshot on feature update")
}
+ @Test
+ def testNoSnapshotAfterError(): Unit = {
+ val snapshotter = new MockMetadataSnapshotter()
+ val faultHandler = new MockFaultHandler("metadata loading")
+
+ val listener = newBrokerMetadataListener(
+ snapshotter = Some(snapshotter),
+ maxBytesBetweenSnapshots = 1000L,
+ faultHandler = faultHandler)
+ try {
+ val brokerIds = 0 to 3
+
+ registerBrokers(listener, brokerIds, endOffset = 100L)
+ createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
+ listener.getImageRecords().get()
+ assertEquals(200L, listener.highestMetadataOffset)
+ assertEquals(-1L, snapshotter.prevCommittedOffset)
+ assertEquals(-1L, snapshotter.activeSnapshotOffset)
+
+ // Append invalid records that will normally trigger a snapshot
+ generateBadRecords(listener, 1000L)
+ assertEquals(-1L, snapshotter.prevCommittedOffset)
+ assertEquals(-1L, snapshotter.activeSnapshotOffset)
+
+ // Generate some records that will not throw an error, verify still no snapshots
+ generateManyRecords(listener, 2000L)
+ assertEquals(-1L, snapshotter.prevCommittedOffset)
+ assertEquals(-1L, snapshotter.activeSnapshotOffset)
+ } finally {
+ listener.close()
+ }
+ }
+
private def registerBrokers(
listener: BrokerMetadataListener,
brokerIds: Iterable[Int],