You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/10 13:05:54 UTC
[kafka] branch trunk updated: MINOR: Move some things around in KRaftMigrationDriver (#13978)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 726d277c0aa MINOR: Move some things around in KRaftMigrationDriver (#13978)
726d277c0aa is described below
commit 726d277c0aac6bfc57a16ef240740b7875ce75ae
Author: David Arthur <mu...@gmail.com>
AuthorDate: Mon Jul 10 09:05:46 2023 -0400
MINOR: Move some things around in KRaftMigrationDriver (#13978)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../metadata/migration/KRaftMigrationDriver.java | 335 ++++++++++-----------
1 file changed, 164 insertions(+), 171 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index de691fa36f9..9cb788cc5db 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -71,7 +71,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
private final Time time;
- private final LogContext logContext;
private final Logger log;
private final int nodeId;
private final MigrationClient zkMigrationClient;
@@ -80,16 +79,15 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private final ZkRecordConsumer zkRecordConsumer;
private final KafkaEventQueue eventQueue;
private final FaultHandler faultHandler;
+ private final QuorumFeatures quorumFeatures;
/**
* A callback for when the migration state has been recovered from ZK. This is used to delay the installation of this
* MetadataPublisher with MetadataLoader.
*/
private final Consumer<MetadataPublisher> initialZkLoadHandler;
- private volatile LeaderAndEpoch leaderAndEpoch;
private volatile MigrationDriverState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image;
- private volatile QuorumFeatures quorumFeatures;
private volatile boolean firstPublish;
public KRaftMigrationDriver(
@@ -107,14 +105,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator;
this.time = time;
- this.logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
+ LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
this.image = MetadataImage.EMPTY;
this.firstPublish = false;
- this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler;
this.faultHandler = faultHandler;
this.quorumFeatures = quorumFeatures;
@@ -138,12 +135,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
eventQueue.prepend(new PollEvent());
}
- public void shutdown() throws InterruptedException {
- eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
- log.debug("Shutting down KRaftMigrationDriver");
- eventQueue.close();
- }
-
// Visible for testing
public CompletableFuture<MigrationDriverState> migrationState() {
CompletableFuture<MigrationDriverState> stateFuture = new CompletableFuture<>();
@@ -303,6 +294,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
migrationState = newState;
}
+ // MetadataPublisher methods
+
@Override
public String name() {
return "KRaftMigrationDriver";
@@ -326,6 +319,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
NO_OP_HANDLER);
}
+ @Override
+ public void close() throws InterruptedException {
+ eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
+ log.debug("Shutting down KRaftMigrationDriver");
+ eventQueue.close();
+ }
+
/**
* Construct and enqueue a {@link MetadataChangeEvent} with a given completion handler. In production use cases,
* this handler is a no-op. This method exists so that we can add additional logic in our unit tests to wait for the
@@ -348,11 +348,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
eventQueue.append(metadataChangeEvent);
}
- @Override
- public void close() throws Exception {
- eventQueue.close();
- }
-
// Events handled by Migration Driver.
abstract class MigrationEvent implements EventQueue.Event {
@SuppressWarnings("ThrowableNotThrown")
@@ -375,50 +370,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
- class PollEvent extends MigrationEvent {
- @Override
- public void run() throws Exception {
- switch (migrationState) {
- case UNINITIALIZED:
- recoverMigrationStateFromZK();
- break;
- case INACTIVE:
- // Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent
- // tells informs us that we are the leader.
- break;
- case WAIT_FOR_CONTROLLER_QUORUM:
- eventQueue.append(new WaitForControllerQuorumEvent());
- break;
- case BECOME_CONTROLLER:
- eventQueue.append(new BecomeZkControllerEvent());
- break;
- case WAIT_FOR_BROKERS:
- eventQueue.append(new WaitForZkBrokersEvent());
- break;
- case ZK_MIGRATION:
- eventQueue.append(new MigrateMetadataEvent());
- break;
- case SYNC_KRAFT_TO_ZK:
- eventQueue.append(new SyncKRaftMetadataEvent());
- break;
- case KRAFT_CONTROLLER_TO_BROKER_COMM:
- eventQueue.append(new SendRPCsToBrokersEvent());
- break;
- case DUAL_WRITE:
- // Nothing to do in the PollEvent. If there's metadata change, we use
- // MetadataChange event to drive the writes to Zookeeper.
- break;
- }
-
- // Poll again after some time
- long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
- eventQueue.scheduleDeferred(
- "poll",
- new EventQueue.DeadlineFunction(deadline),
- new PollEvent());
- }
- }
-
/**
* An event generated by a call to {@link MetadataPublisher#onControllerChange}. This will not be called until
* this class is registered with {@link org.apache.kafka.image.loader.MetadataLoader}. The registration happens
@@ -434,7 +385,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
@Override
public void run() throws Exception {
// We can either be the active controller or just resigned from being the controller.
- KRaftMigrationDriver.this.leaderAndEpoch = leaderAndEpoch;
boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
if (!isActive) {
@@ -458,6 +408,86 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
+ class MetadataChangeEvent extends MigrationEvent {
+ private final MetadataDelta delta;
+ private final MetadataImage image;
+ private final MetadataProvenance provenance;
+ private final boolean isSnapshot;
+ private final Consumer<Throwable> completionHandler;
+
+ MetadataChangeEvent(
+ MetadataDelta delta,
+ MetadataImage image,
+ MetadataProvenance provenance,
+ boolean isSnapshot,
+ Consumer<Throwable> completionHandler
+ ) {
+ this.delta = delta;
+ this.image = image;
+ this.provenance = provenance;
+ this.isSnapshot = isSnapshot;
+ this.completionHandler = completionHandler;
+ }
+
+ @Override
+ public void run() throws Exception {
+ KRaftMigrationDriver.this.firstPublish = true;
+ MetadataImage prevImage = KRaftMigrationDriver.this.image;
+ KRaftMigrationDriver.this.image = image;
+ String metadataType = isSnapshot ? "snapshot" : "delta";
+
+ if (!migrationState.allowDualWrite()) {
+ log.trace("Received metadata {}, but the controller is not in dual-write " +
+ "mode. Ignoring the change to be replicated to Zookeeper", metadataType);
+ completionHandler.accept(null);
+ return;
+ }
+
+ if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
+ log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
+ completionHandler.accept(null);
+ return;
+ }
+
+ Map<String, Integer> dualWriteCounts = new TreeMap<>();
+ if (isSnapshot) {
+ zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
+ dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+ } else {
+ zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
+ dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+ }
+ if (dualWriteCounts.isEmpty()) {
+ log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta");
+ } else {
+ log.debug("Made the following ZK writes when handling KRaft {}: {}", isSnapshot ? "snapshot" : "delta", dualWriteCounts);
+ }
+
+ // Persist the offset of the metadata that was written to ZK
+ ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
+ image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
+ applyMigrationOperation("Updating ZK migration state after " + metadataType,
+ state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
+
+ // TODO: Unhappy path: Probably relinquish leadership and let new controller
+ // retry the write?
+ if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
+ log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
+ propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch());
+ } else {
+ log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
+ }
+
+ completionHandler.accept(null);
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ completionHandler.accept(e);
+ super.handleException(e);
+ }
+ }
+
class WaitForControllerQuorumEvent extends MigrationEvent {
@Override
@@ -503,6 +533,23 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
+ class WaitForZkBrokersEvent extends MigrationEvent {
+ @Override
+ public void run() throws Exception {
+ switch (migrationState) {
+ case WAIT_FOR_BROKERS:
+ if (areZkBrokersReadyForMigration()) {
+ log.debug("Zk brokers are registered and ready for migration");
+ transitionTo(MigrationDriverState.BECOME_CONTROLLER);
+ }
+ break;
+ default:
+ // Ignore the event as we're not in the appropriate state anymore.
+ break;
+ }
+ }
+ }
+
class BecomeZkControllerEvent extends MigrationEvent {
@Override
public void run() throws Exception {
@@ -521,23 +568,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
- class WaitForZkBrokersEvent extends MigrationEvent {
- @Override
- public void run() throws Exception {
- switch (migrationState) {
- case WAIT_FOR_BROKERS:
- if (areZkBrokersReadyForMigration()) {
- log.debug("Zk brokers are registered and ready for migration");
- transitionTo(MigrationDriverState.BECOME_CONTROLLER);
- }
- break;
- default:
- // Ignore the event as we're not in the appropriate state anymore.
- break;
- }
- }
- }
-
class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
@@ -590,23 +620,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
- static KRaftMigrationOperationConsumer countingOperationConsumer(
- Map<String, Integer> dualWriteCounts,
- BiConsumer<String, KRaftMigrationOperation> operationConsumer
- ) {
- return (opType, logMsg, operation) -> {
- dualWriteCounts.compute(opType, (key, value) -> {
- if (value == null) {
- return 1;
- } else {
- return value + 1;
- }
- });
- operationConsumer.accept(logMsg, operation);
- };
- }
-
-
class SyncKRaftMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
@@ -614,7 +627,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
log.info("Performing a full metadata sync from KRaft to ZK.");
Map<String, Integer> dualWriteCounts = new TreeMap<>();
zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
- dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+ dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts);
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
}
@@ -629,95 +642,59 @@ public class KRaftMigrationDriver implements MetadataPublisher {
if (migrationState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
- "at offset and epoch {}", image.highestOffsetAndEpoch());
+ "at offset and epoch {}", image.highestOffsetAndEpoch());
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're not doing any Zk writes.
transitionTo(MigrationDriverState.DUAL_WRITE);
} else {
log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
- migrationLeadershipState.offsetAndEpoch());
+ migrationLeadershipState.offsetAndEpoch());
}
}
}
}
- class MetadataChangeEvent extends MigrationEvent {
- private final MetadataDelta delta;
- private final MetadataImage image;
- private final MetadataProvenance provenance;
- private final boolean isSnapshot;
- private final Consumer<Throwable> completionHandler;
-
- MetadataChangeEvent(
- MetadataDelta delta,
- MetadataImage image,
- MetadataProvenance provenance,
- boolean isSnapshot,
- Consumer<Throwable> completionHandler
- ) {
- this.delta = delta;
- this.image = image;
- this.provenance = provenance;
- this.isSnapshot = isSnapshot;
- this.completionHandler = completionHandler;
- }
-
+ class PollEvent extends MigrationEvent {
@Override
public void run() throws Exception {
- KRaftMigrationDriver.this.firstPublish = true;
- MetadataImage prevImage = KRaftMigrationDriver.this.image;
- KRaftMigrationDriver.this.image = image;
- String metadataType = isSnapshot ? "snapshot" : "delta";
-
- if (!migrationState.allowDualWrite()) {
- log.trace("Received metadata {}, but the controller is not in dual-write " +
- "mode. Ignoring the change to be replicated to Zookeeper", metadataType);
- completionHandler.accept(null);
- return;
- }
-
- if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
- log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
- completionHandler.accept(null);
- return;
- }
-
- Map<String, Integer> dualWriteCounts = new TreeMap<>();
- if (isSnapshot) {
- zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
- dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
- } else {
- zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
- dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
- }
- if (dualWriteCounts.isEmpty()) {
- log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta");
- } else {
- log.debug("Made the following ZK writes when handling KRaft {}: {}", isSnapshot ? "snapshot" : "delta", dualWriteCounts);
- }
-
- // Persist the offset of the metadata that was written to ZK
- ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
- image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
- applyMigrationOperation("Updating ZK migration state after " + metadataType,
- state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
-
- // TODO: Unhappy path: Probably relinquish leadership and let new controller
- // retry the write?
- if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
- log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
- propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch());
- } else {
- log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
+ switch (migrationState) {
+ case UNINITIALIZED:
+ recoverMigrationStateFromZK();
+ break;
+ case INACTIVE:
+ // Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent
+ // tells informs us that we are the leader.
+ break;
+ case WAIT_FOR_CONTROLLER_QUORUM:
+ eventQueue.append(new WaitForControllerQuorumEvent());
+ break;
+ case WAIT_FOR_BROKERS:
+ eventQueue.append(new WaitForZkBrokersEvent());
+ break;
+ case BECOME_CONTROLLER:
+ eventQueue.append(new BecomeZkControllerEvent());
+ break;
+ case ZK_MIGRATION:
+ eventQueue.append(new MigrateMetadataEvent());
+ break;
+ case SYNC_KRAFT_TO_ZK:
+ eventQueue.append(new SyncKRaftMetadataEvent());
+ break;
+ case KRAFT_CONTROLLER_TO_BROKER_COMM:
+ eventQueue.append(new SendRPCsToBrokersEvent());
+ break;
+ case DUAL_WRITE:
+ // Nothing to do in the PollEvent. If there's metadata change, we use
+ // MetadataChange event to drive the writes to Zookeeper.
+ break;
}
- completionHandler.accept(null);
- }
-
- @Override
- public void handleException(Throwable e) {
- completionHandler.accept(e);
- super.handleException(e);
+ // Poll again after some time
+ long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
+ eventQueue.scheduleDeferred(
+ "poll",
+ new EventQueue.DeadlineFunction(deadline),
+ new PollEvent());
}
}
@@ -745,4 +722,20 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}).collect(Collectors.joining(","));
return "[" + batchString + "]";
}
+
+ static KRaftMigrationOperationConsumer countingOperationConsumer(
+ Map<String, Integer> dualWriteCounts,
+ BiConsumer<String, KRaftMigrationOperation> operationConsumer
+ ) {
+ return (opType, logMsg, operation) -> {
+ dualWriteCounts.compute(opType, (key, value) -> {
+ if (value == null) {
+ return 1;
+ } else {
+ return value + 1;
+ }
+ });
+ operationConsumer.accept(logMsg, operation);
+ };
+ }
}