You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/08 10:16:04 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

divijvaidya commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r892101409


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -117,26 +117,30 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _publisher.foreach(publish)
 
-      // If we detected a change in metadata.version, generate a local snapshot
-      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
-        featuresDelta.metadataVersionChange().isPresent
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      if (_publisher.nonEmpty && shouldSnapshot()) {
+        maybeStartSnapshot()
       }
 
-      snapshotter.foreach { snapshotter =>
-        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot() || metadataVersionChanged) {
-          if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
-            _bytesSinceLastSnapshot = 0L
-          }
-        }
-      }
+      _publisher.foreach(publish)
     }
   }
 
   private def shouldSnapshot(): Boolean = {
-    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots || metadataVersionChanged()

Review Comment:
   nit
   
   personally I prefer to use parentheses for explicit grouping so that reader doesn't have to guess the precedence order amongst binary operators. It also avoids inadvertent bugs.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -117,26 +117,30 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _publisher.foreach(publish)
 
-      // If we detected a change in metadata.version, generate a local snapshot
-      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
-        featuresDelta.metadataVersionChange().isPresent
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      if (_publisher.nonEmpty && shouldSnapshot()) {
+        maybeStartSnapshot()
       }
 
-      snapshotter.foreach { snapshotter =>
-        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot() || metadataVersionChanged) {
-          if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
-            _bytesSinceLastSnapshot = 0L
-          }
-        }
-      }
+      _publisher.foreach(publish)

Review Comment:
   could we rename `publish` to `publishAndResetDelta`? This will make it very implicit that the publish method resets the state set by loadBatches.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -244,6 +248,9 @@ class BrokerMetadataListener(
       _publisher = Some(publisher)
       log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
       try {
+        if (shouldSnapshot()) {

Review Comment:
   For my better understanding here, please let me know if my assumptions are correct:
   
   A listener can potentially receive a `handleCommit` or a `handleSnapshot` even before it has started publishing. It is because there is a duration of time between when a metadataListener is registered and a metaListener starts publishing. Yes?
   
   In such cases (when listener has not started publishing) `handleCommit` will create a snapshot if it detects a metadata version change. 
   
   Then why do we need to create a snapshot on start publishing? Isn't it guaranteed that there would have been a `handleCommit` prior to this which would already have created the snapshot?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -240,6 +244,40 @@ class BrokerMetadataListenerTest {
     }
   }
 
+  @Test
+  def testNotSnapshotBeforePublishing(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L)
+    listener.getImageRecords().get()
+    assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot before starting publishing")
+  }
+
+  @Test
+  def testSnapshotWhenStarting(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L)
+    listener.startPublishing(new MockMetadataPublisher()).get()
+    assertEquals(100L, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChange(): Unit = {

Review Comment:
   could we also add a negative test here which would have failed before we fixed the bug where publish was resetting _delta and hence we were doing a snapshot on every commit. Perhaps a test that validates that the snapshot is not occurring more than expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org