You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/17 17:03:39 UTC

[kafka] branch trunk updated: MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d944ef1efbd MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
d944ef1efbd is described below

commit d944ef1efbdb577e4c7c847b790df884ba2b58d2
Author: David Mao <dm...@confluent.io>
AuthorDate: Wed May 17 09:57:24 2023 -0700

    MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
    
    Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load,
    not generation.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/server/metadata/OffsetTrackingListener.scala     |  2 +-
 core/src/main/scala/kafka/tools/TestRaftServer.scala       |  2 +-
 .../java/org/apache/kafka/controller/QuorumController.java |  4 ++--
 .../java/org/apache/kafka/image/loader/MetadataLoader.java | 10 +++++-----
 .../kafka/metadata/migration/KRaftMigrationDriver.java     |  2 +-
 .../kafka/metadata/migration/KRaftMigrationZkWriter.java   |  2 +-
 .../org/apache/kafka/image/loader/MetadataLoaderTest.java  | 12 ++++++------
 .../java/org/apache/kafka/metalog/LocalLogManager.java     |  6 +++---
 .../apache/kafka/metalog/MockMetaLogManagerListener.java   |  2 +-
 .../main/java/org/apache/kafka/raft/KafkaRaftClient.java   |  2 +-
 raft/src/main/java/org/apache/kafka/raft/RaftClient.java   |  4 ++--
 .../main/java/org/apache/kafka/raft/ReplicatedCounter.java | 14 +++++++-------
 .../java/org/apache/kafka/raft/RaftClientTestContext.java  |  2 +-
 .../org/apache/kafka/raft/RaftEventSimulationTest.java     |  2 +-
 .../java/org/apache/kafka/shell/MetadataNodeManager.java   |  2 +-
 15 files changed, 34 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala b/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
index 056b6f0fec1..9ae11e22ecc 100644
--- a/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
@@ -41,7 +41,7 @@ class OffsetTrackingListener extends RaftClient.Listener[ApiMessageAndVersion] {
     reader.close()
   }
 
-  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
+  override def handleLoadSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
     _highestOffset = reader.lastContainedLogOffset()
     reader.close()
   }
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 2f480053a6a..be45bae578d 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -180,7 +180,7 @@ class TestRaftServer(
       eventQueue.offer(HandleCommit(reader))
     }
 
-    override def handleSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
+    override def handleLoadSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
       eventQueue.offer(HandleSnapshot(reader))
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 4da4298a146..78b0f6341cf 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -994,8 +994,8 @@ public final class QuorumController implements Controller {
         }
 
         @Override
-        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-            appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
+        public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+            appendRaftEvent(String.format("handleLoadSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                 try {
                     String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
                     if (isActiveController()) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index a9dd3ea27d3..768fcb2574b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -415,14 +415,14 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
     }
 
     @Override
-    public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+    public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
         eventQueue.append(() -> {
             try {
                 MetadataDelta delta = new MetadataDelta.Builder().
                         setImage(image).
                         build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
-                log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " +
+                log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " +
                         "in {} us.", manifest.provenance().lastContainedOffset(),
                         NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
@@ -432,10 +432,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                             "snapshot at offset " + reader.lastContainedLogOffset(), e);
                     return;
                 }
-                if (stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) {
+                if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.info("handleSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
+                log.info("handleLoadSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.onMetadataUpdate(delta, image, manifest);
@@ -452,7 +452,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
             } catch (Throwable e) {
                 // This is a general catch-all block where we don't expect to end up;
                 // failure-prone operations should have individual try/catch blocks around them.
-                faultHandler.handleFault("Unhandled fault in MetadataLoader#handleSnapshot. " +
+                faultHandler.handleFault("Unhandled fault in MetadataLoader#handleLoadSnapshot. " +
                         "Snapshot offset was " + reader.lastContainedLogOffset(), e);
             } finally {
                 reader.close();
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index caa030abbce..be467c59592 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -639,7 +639,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             }
 
             if (isSnapshot) {
-                zkMetadataWriter.handleSnapshot(image);
+                zkMetadataWriter.handleLoadSnapshot(image);
             } else {
                 zkMetadataWriter.handleDelta(prevImage, image, delta);
             }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index a324165c897..99174e973df 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -68,7 +68,7 @@ public class KRaftMigrationZkWriter {
         this.operationConsumer = operationConsumer;
     }
 
-    public void handleSnapshot(MetadataImage image) {
+    public void handleLoadSnapshot(MetadataImage image) {
         handleTopicsSnapshot(image.topics());
         handleConfigsSnapshot(image.configs());
         handleClientQuotasSnapshot(image.clientQuotas(), image.scram());
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index f51080203cf..1eefa15a8c3 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -250,7 +250,7 @@ public class MetadataLoaderTest {
                         )
                     )
                 );
-                loader.handleSnapshot(snapshotReader);
+                loader.handleLoadSnapshot(snapshotReader);
             }
             loader.waitForAllEventsToBeHandled();
             if (sameObject) {
@@ -291,7 +291,7 @@ public class MetadataLoaderTest {
                         setName(MetadataVersion.FEATURE_NAME).
                         setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0))));
             assertFalse(snapshotReader.closed);
-            loader.handleSnapshot(snapshotReader);
+            loader.handleLoadSnapshot(snapshotReader);
             loader.waitForAllEventsToBeHandled();
             assertTrue(snapshotReader.closed);
             publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
@@ -355,7 +355,7 @@ public class MetadataLoaderTest {
         if (loader.time() instanceof MockTime) {
             snapshotReader.setTime((MockTime) loader.time());
         }
-        loader.handleSnapshot(snapshotReader);
+        loader.handleLoadSnapshot(snapshotReader);
         loader.waitForAllEventsToBeHandled();
     }
 
@@ -469,7 +469,7 @@ public class MetadataLoaderTest {
                 setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers).get();
-            loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+            loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
                 new MetadataProvenance(200, 100, 4000), asList(
                     asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                         setName(MetadataVersion.FEATURE_NAME).
@@ -539,7 +539,7 @@ public class MetadataLoaderTest {
         MetadataLoader loader,
         long offset
     ) throws Exception {
-        loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+        loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
                 new MetadataProvenance(offset, 100, 4000), asList(
                         asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                                 setName(MetadataVersion.FEATURE_NAME).
@@ -555,7 +555,7 @@ public class MetadataLoaderTest {
         MetadataLoader loader,
         long offset
     ) throws Exception {
-        loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+        loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
                 new MetadataProvenance(offset, 100, 4000), asList(
                         asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                                 setName(MetadataVersion.FEATURE_NAME).
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 492251bb021..46f99db5d1d 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -424,8 +424,8 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
             offset = reader.lastOffset().getAsLong();
         }
 
-        void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-            listener.handleSnapshot(reader);
+        void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+            listener.handleLoadSnapshot(reader);
             offset = reader.lastContainedLogOffset();
         }
 
@@ -519,7 +519,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
                             Optional<RawSnapshotReader> snapshot = shared.nextSnapshot(listenerData.offset());
                             if (snapshot.isPresent()) {
                                 log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
-                                listenerData.handleSnapshot(
+                                listenerData.handleLoadSnapshot(
                                     RecordsSnapshotReader.of(
                                         snapshot.get(),
                                         new  MetadataRecordSerde(),
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index 53a10ca8a0f..3d7267d94a5 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -68,7 +68,7 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag
     }
 
     @Override
-    public synchronized void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+    public synchronized void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
         long lastCommittedOffset = reader.lastContainedLogOffset();
         try {
             while (reader.hasNext()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2bb4ef9b0d2..94002f36d2a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2531,7 +2531,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             }
 
             logger.debug("Notifying listener {} of snapshot {}", listenerName(), reader.snapshotId());
-            listener.handleSnapshot(reader);
+            listener.handleLoadSnapshot(reader);
         }
 
         /**
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 84567c4efbd..6422fb5f3a0 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -58,7 +58,7 @@ public interface RaftClient<T> extends AutoCloseable {
          *
          * @param reader snapshot reader instance which must be iterated and closed
          */
-        void handleSnapshot(SnapshotReader<T> reader);
+        void handleLoadSnapshot(SnapshotReader<T> reader);
 
         /**
          * Called on any change to leadership. This includes both when a leader is elected and
@@ -66,7 +66,7 @@ public interface RaftClient<T> extends AutoCloseable {
          *
          * If this node is the leader, then the notification of leadership will be delayed until
          * the implementation of this interface has caught up to the high-watermark through calls to
-         * {@link #handleSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
+         * {@link #handleLoadSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
          *
          * If this node is not the leader, then this method will be called as soon as possible. In
          * this case the leader may or may not be known for the current epoch.
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 65a3b6526e9..cceb65930ed 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -39,7 +39,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
     private OptionalInt claimedEpoch = OptionalInt.empty();
     private long lastOffsetSnapshotted = -1;
 
-    private int handleSnapshotCalls = 0;
+    private int handleLoadSnapshotCalls = 0;
 
     public ReplicatedCounter(
         int nodeId,
@@ -135,7 +135,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
     }
 
     @Override
-    public synchronized void handleSnapshot(SnapshotReader<Integer> reader) {
+    public synchronized void handleLoadSnapshot(SnapshotReader<Integer> reader) {
         try {
             log.debug("Loading snapshot {}", reader.snapshotId());
             while (reader.hasNext()) {
@@ -157,7 +157,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
                 }
             }
             lastOffsetSnapshotted = reader.lastContainedLogOffset();
-            handleSnapshotCalls += 1;
+            handleLoadSnapshotCalls += 1;
             log.debug("Finished loading snapshot. Set value: {}", committed);
         } finally {
             reader.close();
@@ -176,11 +176,11 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
             uncommitted = -1;
             claimedEpoch = OptionalInt.empty();
         }
-        handleSnapshotCalls = 0;
+        handleLoadSnapshotCalls = 0;
     }
 
-    /** Use handleSnapshotCalls to verify leader is never asked to load snapshot */
-    public int handleSnapshotCalls() {
-        return handleSnapshotCalls;
+    /** Use handleLoadSnapshotCalls to verify leader is never asked to load snapshot */
+    public int handleLoadSnapshotCalls() {
+        return handleLoadSnapshotCalls;
     }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 50647f8df3a..06a300f666c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1220,7 +1220,7 @@ public final class RaftClientTestContext {
         }
 
         @Override
-        public void handleSnapshot(SnapshotReader<String> reader) {
+        public void handleLoadSnapshot(SnapshotReader<String> reader) {
             snapshot.ifPresent(snapshot -> assertDoesNotThrow(snapshot::close));
             commits.clear();
             savedBatches.clear();
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index b12cd163d71..250cf221511 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1066,7 +1066,7 @@ public class RaftEventSimulationTest {
         public void verify() {
             for (RaftNode raftNode : cluster.running()) {
                 if (raftNode.counter.isWritable()) {
-                    assertEquals(0, raftNode.counter.handleSnapshotCalls());
+                    assertEquals(0, raftNode.counter.handleLoadSnapshotCalls());
                 }
             }
         }
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 93dc3073fde..583547e2ed3 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -113,7 +113,7 @@ public final class MetadataNodeManager implements AutoCloseable {
         }
 
         @Override
-        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+        public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
             try {
                 while (reader.hasNext()) {
                     Batch<ApiMessageAndVersion> batch = reader.next();