You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/09/13 17:04:14 UTC

[kafka] branch trunk updated: KAFKA-14073; Log the reason for snapshot (#12414)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86645cb40a7 KAFKA-14073; Log the reason for snapshot (#12414)
86645cb40a7 is described below

commit 86645cb40a7f0e3b3e7aed7925f2488c0cbbf1b4
Author: Ashmeet Lamba <as...@gmail.com>
AuthorDate: Tue Sep 13 22:33:47 2022 +0530

    KAFKA-14073; Log the reason for snapshot (#12414)
    
    When a snapshot is taken it is due to either of the following reasons -
    
        Max bytes were applied
        Metadata version was changed
    
    Once the snapshot process is started, it will log the reason that initiated the process.
    
    Updated existing tests to include code changes required to log the reason. I was not able to check the logs when running tests - could someone guide me on how to enable logs when running a specific test case.
    
    Reviewers: dengziming <de...@gmail.com>, José Armando García Sancio <js...@apache.org>
---
 .../server/metadata/BrokerMetadataListener.scala   | 28 +++++++++++++-----
 .../metadata/BrokerMetadataSnapshotter.scala       |  6 ++--
 .../server/metadata/MetadataSnapshotter.scala      |  4 ++-
 .../metadata/BrokerMetadataListenerTest.scala      |  3 +-
 .../metadata/BrokerMetadataSnapshotterTest.scala   | 27 +++++++++++++++--
 .../apache/kafka/controller/QuorumController.java  |  7 +++--
 .../apache/kafka/metadata/util/SnapshotReason.java | 34 ++++++++++++++++++++++
 .../org/apache/kafka/raft/ReplicatedCounter.java   |  2 +-
 8 files changed, 95 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index cf7bf5aed90..a1dfaf218dd 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -22,6 +22,7 @@ import java.util.function.Consumer
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.metadata.util.SnapshotReason
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
 import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -141,16 +142,29 @@ class BrokerMetadataListener(
       }
 
       _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-      if (shouldSnapshot()) {
-        maybeStartSnapshot()
+      
+      val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot()
+      if (shouldTakeSnapshot.nonEmpty) {
+        maybeStartSnapshot(shouldTakeSnapshot)
       }
 
       _publisher.foreach(publish)
     }
   }
 
-  private def shouldSnapshot(): Boolean = {
-    (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged()
+  private def shouldSnapshot(): Set[SnapshotReason] = {
+    val metadataVersionHasChanged = metadataVersionChanged()
+    val maxBytesHaveExceeded = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots)
+
+    if (maxBytesHaveExceeded && metadataVersionHasChanged) {
+      Set(SnapshotReason.MetadataVersionChanged, SnapshotReason.MaxBytesExceeded)
+    } else if (maxBytesHaveExceeded) {
+      Set(SnapshotReason.MaxBytesExceeded)
+    } else if (metadataVersionHasChanged) {
+      Set(SnapshotReason.MetadataVersionChanged)
+    } else {
+      Set()
+    }
   }
 
   private def metadataVersionChanged(): Boolean = {
@@ -161,11 +175,11 @@ class BrokerMetadataListener(
     }
   }
 
-  private def maybeStartSnapshot(): Unit = {
+  private def maybeStartSnapshot(reason: Set[SnapshotReason]): Unit = {
     snapshotter.foreach { snapshotter =>
       if (metadataFaultOccurred.get()) {
         trace("Not starting metadata snapshot since we previously had an error")
-      } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
+      } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply(), reason)) {
         _bytesSinceLastSnapshot = 0L
       }
     }
@@ -293,7 +307,7 @@ class BrokerMetadataListener(
       log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
       try {
         if (metadataVersionChanged()) {
-          maybeStartSnapshot()
+          maybeStartSnapshot(Set(SnapshotReason.MetadataVersionChanged))
         }
         publish(publisher)
         future.complete(null)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index 2a236ca7497..37a06c17127 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.RejectedExecutionException
 import kafka.utils.Logging
 import org.apache.kafka.image.MetadataImage
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.metadata.util.SnapshotReason
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.snapshot.SnapshotWriter
@@ -82,7 +83,7 @@ class BrokerMetadataSnapshotter(
    */
   val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
 
-  override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized {
+  override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, snapshotReasons: Set[SnapshotReason]): Boolean = synchronized {
     if (_currentSnapshotOffset != -1) {
       info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
         s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
@@ -95,7 +96,8 @@ class BrokerMetadataSnapshotter(
       )
       if (writer.nonEmpty) {
         _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
-        info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
+
+        info(s"Creating a new snapshot at offset ${_currentSnapshotOffset} because, ${snapshotReasons.mkString(" and ")}")
         eventQueue.append(new CreateSnapshotEvent(image, writer.get))
         true
       } else {
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
index c9d72923c88..b175bf78e49 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
@@ -17,6 +17,7 @@
 package kafka.server.metadata
 
 import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.metadata.util.SnapshotReason
 
 
 /**
@@ -28,8 +29,9 @@ trait MetadataSnapshotter {
    *
    * @param lastContainedLogTime  The highest time contained in the snapshot.
    * @param image                 The metadata image to write out.
+   * @param reason                Set of reasons due to which a snapshot is being taken.
    *
    * @return                      True if we will write out a new snapshot; false otherwise.
    */
-  def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean
+  def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, reason: Set[SnapshotReason]): Boolean
 }
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index ec1ee666820..a589c0572dd 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Endpoint, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.util.SnapshotReason
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@@ -137,7 +138,7 @@ class BrokerMetadataListenerTest {
     var prevCommittedEpoch = -1
     var prevLastContainedLogTime = -1L
 
-    override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: MetadataImage): Boolean = {
+    override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: MetadataImage, reason: Set[SnapshotReason]): Boolean = {
       try {
         if (activeSnapshotOffset == -1L) {
           assertTrue(prevCommittedOffset <= newImage.highestOffsetAndEpoch().offset)
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index ff2326c92fa..0ee6959a662 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
 import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.util.SnapshotReason
 import org.apache.kafka.queue.EventQueue
 import org.apache.kafka.raft.OffsetAndEpoch
 import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -96,11 +97,33 @@ class BrokerMetadataSnapshotterTest {
   def testCreateSnapshot(): Unit = {
     val writerBuilder = new MockSnapshotWriterBuilder()
     val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
+    
     try {
       val blockingEvent = new BlockingEvent()
+      val reasons = Set(SnapshotReason.UnknownReason)
+
+      snapshotter.eventQueue.append(blockingEvent)
+      assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
+      assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2, reasons))
+      blockingEvent.latch.countDown()
+      assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
+    } finally {
+      snapshotter.close()
+    }
+  }
+
+  @Test
+  def testCreateSnapshotMultipleReasons(): Unit = {
+    val writerBuilder = new MockSnapshotWriterBuilder()
+    val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
+    
+    try {
+      val blockingEvent = new BlockingEvent()
+      val reasons = Set(SnapshotReason.MaxBytesExceeded, SnapshotReason.MetadataVersionChanged)
+
       snapshotter.eventQueue.append(blockingEvent)
-      assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1))
-      assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2))
+      assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
+      assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2, reasons))
       blockingEvent.latch.countDown()
       assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
     } finally {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ad601b0079..4386f4f654e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -89,6 +89,7 @@ import org.apache.kafka.raft.BatchReader;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.metadata.util.SnapshotReason;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -1465,8 +1466,8 @@ public final class QuorumController implements Controller {
                 snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
             }
 
-            log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot.",
-                lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot);
+            log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
+                lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot, SnapshotReason.MaxBytesExceeded);
 
             snapshotGeneratorManager.createSnapshotGenerator(lastCommittedOffset, lastCommittedEpoch, lastCommittedTimestamp);
             newBytesSinceLastSnapshot = 0;
@@ -2116,6 +2117,8 @@ public final class QuorumController implements Controller {
         CompletableFuture<Long> future = new CompletableFuture<>();
         appendControlEvent("beginWritingSnapshot", () -> {
             if (snapshotGeneratorManager.generator == null) {
+                log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
+                    lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot, SnapshotReason.UnknownReason);
                 snapshotGeneratorManager.createSnapshotGenerator(
                     lastCommittedOffset,
                     lastCommittedEpoch,
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java
new file mode 100644
index 00000000000..59d95035f2d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.util;
+
+public enum SnapshotReason {
+    UnknownReason("unknown reason"),
+    MaxBytesExceeded("max bytes were exceeded"),
+    MetadataVersionChanged("metadata version was changed");
+
+    private final String snapshotReason;
+
+    SnapshotReason(String snapshotReason) {
+        this.snapshotReason = snapshotReason;
+    }
+
+    @Override
+    public String toString() {
+        return snapshotReason;
+    }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index c7702ba20a5..27b3be163e0 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -109,7 +109,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
 
             if (lastOffsetSnapshotted + snapshotDelayInRecords < lastCommittedOffset) {
                 log.debug(
-                    "Generating new snapshot with committed offset {} and epoch {} since the previoud snapshot includes {}",
+                    "Generating new snapshot with committed offset {} and epoch {} since the previous snapshot includes {}",
                     lastCommittedOffset,
                     lastCommittedEpoch,
                     lastOffsetSnapshotted