You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/17 17:03:39 UTC
[kafka] branch trunk updated: MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d944ef1efbd MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
d944ef1efbd is described below
commit d944ef1efbdb577e4c7c847b790df884ba2b58d2
Author: David Mao <dm...@confluent.io>
AuthorDate: Wed May 17 09:57:24 2023 -0700
MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load,
not generation.
Reviewers: Colin P. McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/server/metadata/OffsetTrackingListener.scala | 2 +-
core/src/main/scala/kafka/tools/TestRaftServer.scala | 2 +-
.../java/org/apache/kafka/controller/QuorumController.java | 4 ++--
.../java/org/apache/kafka/image/loader/MetadataLoader.java | 10 +++++-----
.../kafka/metadata/migration/KRaftMigrationDriver.java | 2 +-
.../kafka/metadata/migration/KRaftMigrationZkWriter.java | 2 +-
.../org/apache/kafka/image/loader/MetadataLoaderTest.java | 12 ++++++------
.../java/org/apache/kafka/metalog/LocalLogManager.java | 6 +++---
.../apache/kafka/metalog/MockMetaLogManagerListener.java | 2 +-
.../main/java/org/apache/kafka/raft/KafkaRaftClient.java | 2 +-
raft/src/main/java/org/apache/kafka/raft/RaftClient.java | 4 ++--
.../main/java/org/apache/kafka/raft/ReplicatedCounter.java | 14 +++++++-------
.../java/org/apache/kafka/raft/RaftClientTestContext.java | 2 +-
.../org/apache/kafka/raft/RaftEventSimulationTest.java | 2 +-
.../java/org/apache/kafka/shell/MetadataNodeManager.java | 2 +-
15 files changed, 34 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala b/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
index 056b6f0fec1..9ae11e22ecc 100644
--- a/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
@@ -41,7 +41,7 @@ class OffsetTrackingListener extends RaftClient.Listener[ApiMessageAndVersion] {
reader.close()
}
- override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
+ override def handleLoadSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
_highestOffset = reader.lastContainedLogOffset()
reader.close()
}
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 2f480053a6a..be45bae578d 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -180,7 +180,7 @@ class TestRaftServer(
eventQueue.offer(HandleCommit(reader))
}
- override def handleSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
+ override def handleLoadSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
eventQueue.offer(HandleSnapshot(reader))
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 4da4298a146..78b0f6341cf 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -994,8 +994,8 @@ public final class QuorumController implements Controller {
}
@Override
- public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
- appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
+ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+ appendRaftEvent(String.format("handleLoadSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
try {
String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
if (isActiveController()) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index a9dd3ea27d3..768fcb2574b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -415,14 +415,14 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
}
@Override
- public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
try {
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
- log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " +
+ log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " +
"in {} us.", manifest.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
try {
@@ -432,10 +432,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
"snapshot at offset " + reader.lastContainedLogOffset(), e);
return;
}
- if (stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) {
+ if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) {
return;
}
- log.info("handleSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
+ log.info("handleLoadSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
@@ -452,7 +452,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them.
- faultHandler.handleFault("Unhandled fault in MetadataLoader#handleSnapshot. " +
+ faultHandler.handleFault("Unhandled fault in MetadataLoader#handleLoadSnapshot. " +
"Snapshot offset was " + reader.lastContainedLogOffset(), e);
} finally {
reader.close();
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index caa030abbce..be467c59592 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -639,7 +639,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
if (isSnapshot) {
- zkMetadataWriter.handleSnapshot(image);
+ zkMetadataWriter.handleLoadSnapshot(image);
} else {
zkMetadataWriter.handleDelta(prevImage, image, delta);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index a324165c897..99174e973df 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -68,7 +68,7 @@ public class KRaftMigrationZkWriter {
this.operationConsumer = operationConsumer;
}
- public void handleSnapshot(MetadataImage image) {
+ public void handleLoadSnapshot(MetadataImage image) {
handleTopicsSnapshot(image.topics());
handleConfigsSnapshot(image.configs());
handleClientQuotasSnapshot(image.clientQuotas(), image.scram());
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index f51080203cf..1eefa15a8c3 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -250,7 +250,7 @@ public class MetadataLoaderTest {
)
)
);
- loader.handleSnapshot(snapshotReader);
+ loader.handleLoadSnapshot(snapshotReader);
}
loader.waitForAllEventsToBeHandled();
if (sameObject) {
@@ -291,7 +291,7 @@ public class MetadataLoaderTest {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0))));
assertFalse(snapshotReader.closed);
- loader.handleSnapshot(snapshotReader);
+ loader.handleLoadSnapshot(snapshotReader);
loader.waitForAllEventsToBeHandled();
assertTrue(snapshotReader.closed);
publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
@@ -355,7 +355,7 @@ public class MetadataLoaderTest {
if (loader.time() instanceof MockTime) {
snapshotReader.setTime((MockTime) loader.time());
}
- loader.handleSnapshot(snapshotReader);
+ loader.handleLoadSnapshot(snapshotReader);
loader.waitForAllEventsToBeHandled();
}
@@ -469,7 +469,7 @@ public class MetadataLoaderTest {
setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) {
loader.installPublishers(publishers).get();
- loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+ loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
new MetadataProvenance(200, 100, 4000), asList(
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
@@ -539,7 +539,7 @@ public class MetadataLoaderTest {
MetadataLoader loader,
long offset
) throws Exception {
- loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+ loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
new MetadataProvenance(offset, 100, 4000), asList(
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
@@ -555,7 +555,7 @@ public class MetadataLoaderTest {
MetadataLoader loader,
long offset
) throws Exception {
- loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+ loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
new MetadataProvenance(offset, 100, 4000), asList(
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 492251bb021..46f99db5d1d 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -424,8 +424,8 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
offset = reader.lastOffset().getAsLong();
}
- void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
- listener.handleSnapshot(reader);
+ void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+ listener.handleLoadSnapshot(reader);
offset = reader.lastContainedLogOffset();
}
@@ -519,7 +519,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
Optional<RawSnapshotReader> snapshot = shared.nextSnapshot(listenerData.offset());
if (snapshot.isPresent()) {
log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
- listenerData.handleSnapshot(
+ listenerData.handleLoadSnapshot(
RecordsSnapshotReader.of(
snapshot.get(),
new MetadataRecordSerde(),
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index 53a10ca8a0f..3d7267d94a5 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -68,7 +68,7 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag
}
@Override
- public synchronized void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+ public synchronized void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
long lastCommittedOffset = reader.lastContainedLogOffset();
try {
while (reader.hasNext()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2bb4ef9b0d2..94002f36d2a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2531,7 +2531,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
logger.debug("Notifying listener {} of snapshot {}", listenerName(), reader.snapshotId());
- listener.handleSnapshot(reader);
+ listener.handleLoadSnapshot(reader);
}
/**
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 84567c4efbd..6422fb5f3a0 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -58,7 +58,7 @@ public interface RaftClient<T> extends AutoCloseable {
*
* @param reader snapshot reader instance which must be iterated and closed
*/
- void handleSnapshot(SnapshotReader<T> reader);
+ void handleLoadSnapshot(SnapshotReader<T> reader);
/**
* Called on any change to leadership. This includes both when a leader is elected and
@@ -66,7 +66,7 @@ public interface RaftClient<T> extends AutoCloseable {
*
* If this node is the leader, then the notification of leadership will be delayed until
* the implementation of this interface has caught up to the high-watermark through calls to
- * {@link #handleSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
+ * {@link #handleLoadSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
*
* If this node is not the leader, then this method will be called as soon as possible. In
* this case the leader may or may not be known for the current epoch.
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 65a3b6526e9..cceb65930ed 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -39,7 +39,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
private OptionalInt claimedEpoch = OptionalInt.empty();
private long lastOffsetSnapshotted = -1;
- private int handleSnapshotCalls = 0;
+ private int handleLoadSnapshotCalls = 0;
public ReplicatedCounter(
int nodeId,
@@ -135,7 +135,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
}
@Override
- public synchronized void handleSnapshot(SnapshotReader<Integer> reader) {
+ public synchronized void handleLoadSnapshot(SnapshotReader<Integer> reader) {
try {
log.debug("Loading snapshot {}", reader.snapshotId());
while (reader.hasNext()) {
@@ -157,7 +157,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
}
}
lastOffsetSnapshotted = reader.lastContainedLogOffset();
- handleSnapshotCalls += 1;
+ handleLoadSnapshotCalls += 1;
log.debug("Finished loading snapshot. Set value: {}", committed);
} finally {
reader.close();
@@ -176,11 +176,11 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
uncommitted = -1;
claimedEpoch = OptionalInt.empty();
}
- handleSnapshotCalls = 0;
+ handleLoadSnapshotCalls = 0;
}
- /** Use handleSnapshotCalls to verify leader is never asked to load snapshot */
- public int handleSnapshotCalls() {
- return handleSnapshotCalls;
+ /** Use handleLoadSnapshotCalls to verify leader is never asked to load snapshot */
+ public int handleLoadSnapshotCalls() {
+ return handleLoadSnapshotCalls;
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 50647f8df3a..06a300f666c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1220,7 +1220,7 @@ public final class RaftClientTestContext {
}
@Override
- public void handleSnapshot(SnapshotReader<String> reader) {
+ public void handleLoadSnapshot(SnapshotReader<String> reader) {
snapshot.ifPresent(snapshot -> assertDoesNotThrow(snapshot::close));
commits.clear();
savedBatches.clear();
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index b12cd163d71..250cf221511 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1066,7 +1066,7 @@ public class RaftEventSimulationTest {
public void verify() {
for (RaftNode raftNode : cluster.running()) {
if (raftNode.counter.isWritable()) {
- assertEquals(0, raftNode.counter.handleSnapshotCalls());
+ assertEquals(0, raftNode.counter.handleLoadSnapshotCalls());
}
}
}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 93dc3073fde..583547e2ed3 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -113,7 +113,7 @@ public final class MetadataNodeManager implements AutoCloseable {
}
@Override
- public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
try {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();