You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/10 13:05:54 UTC

[kafka] branch trunk updated: MINOR: Move some things around in KRaftMigrationDriver (#13978)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 726d277c0aa MINOR: Move some things around in KRaftMigrationDriver (#13978)
726d277c0aa is described below

commit 726d277c0aac6bfc57a16ef240740b7875ce75ae
Author: David Arthur <mu...@gmail.com>
AuthorDate: Mon Jul 10 09:05:46 2023 -0400

    MINOR: Move some things around in KRaftMigrationDriver (#13978)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../metadata/migration/KRaftMigrationDriver.java   | 335 ++++++++++-----------
 1 file changed, 164 insertions(+), 171 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index de691fa36f9..9cb788cc5db 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -71,7 +71,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
 
     private final Time time;
-    private final LogContext logContext;
     private final Logger log;
     private final int nodeId;
     private final MigrationClient zkMigrationClient;
@@ -80,16 +79,15 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private final ZkRecordConsumer zkRecordConsumer;
     private final KafkaEventQueue eventQueue;
     private final FaultHandler faultHandler;
+    private final QuorumFeatures quorumFeatures;
     /**
      * A callback for when the migration state has been recovered from ZK. This is used to delay the installation of this
      * MetadataPublisher with MetadataLoader.
      */
     private final Consumer<MetadataPublisher> initialZkLoadHandler;
-    private volatile LeaderAndEpoch leaderAndEpoch;
     private volatile MigrationDriverState migrationState;
     private volatile ZkMigrationLeadershipState migrationLeadershipState;
     private volatile MetadataImage image;
-    private volatile QuorumFeatures quorumFeatures;
     private volatile boolean firstPublish;
 
     public KRaftMigrationDriver(
@@ -107,14 +105,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.zkMigrationClient = zkMigrationClient;
         this.propagator = propagator;
         this.time = time;
-        this.logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
+        LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
         this.log = logContext.logger(KRaftMigrationDriver.class);
         this.migrationState = MigrationDriverState.UNINITIALIZED;
         this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
         this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
         this.image = MetadataImage.EMPTY;
         this.firstPublish = false;
-        this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
         this.initialZkLoadHandler = initialZkLoadHandler;
         this.faultHandler = faultHandler;
         this.quorumFeatures = quorumFeatures;
@@ -138,12 +135,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         eventQueue.prepend(new PollEvent());
     }
 
-    public void shutdown() throws InterruptedException {
-        eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
-        log.debug("Shutting down KRaftMigrationDriver");
-        eventQueue.close();
-    }
-
     // Visible for testing
     public CompletableFuture<MigrationDriverState> migrationState() {
         CompletableFuture<MigrationDriverState> stateFuture = new CompletableFuture<>();
@@ -303,6 +294,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         migrationState = newState;
     }
 
+    // MetadataPublisher methods
+
     @Override
     public String name() {
         return "KRaftMigrationDriver";
@@ -326,6 +319,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             NO_OP_HANDLER);
     }
 
+    @Override
+    public void close() throws InterruptedException {
+        eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
+        log.debug("Shutting down KRaftMigrationDriver");
+        eventQueue.close();
+    }
+
     /**
      * Construct and enqueue a {@link MetadataChangeEvent} with a given completion handler. In production use cases,
      * this handler is a no-op. This method exists so that we can add additional logic in our unit tests to wait for the
@@ -348,11 +348,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         eventQueue.append(metadataChangeEvent);
     }
 
-    @Override
-    public void close() throws Exception {
-        eventQueue.close();
-    }
-
     // Events handled by Migration Driver.
     abstract class MigrationEvent implements EventQueue.Event {
         @SuppressWarnings("ThrowableNotThrown")
@@ -375,50 +370,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
-    class PollEvent extends MigrationEvent {
-        @Override
-        public void run() throws Exception {
-            switch (migrationState) {
-                case UNINITIALIZED:
-                    recoverMigrationStateFromZK();
-                    break;
-                case INACTIVE:
-                    // Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent
-                    // tells informs us that we are the leader.
-                    break;
-                case WAIT_FOR_CONTROLLER_QUORUM:
-                    eventQueue.append(new WaitForControllerQuorumEvent());
-                    break;
-                case BECOME_CONTROLLER:
-                    eventQueue.append(new BecomeZkControllerEvent());
-                    break;
-                case WAIT_FOR_BROKERS:
-                    eventQueue.append(new WaitForZkBrokersEvent());
-                    break;
-                case ZK_MIGRATION:
-                    eventQueue.append(new MigrateMetadataEvent());
-                    break;
-                case SYNC_KRAFT_TO_ZK:
-                    eventQueue.append(new SyncKRaftMetadataEvent());
-                    break;
-                case KRAFT_CONTROLLER_TO_BROKER_COMM:
-                    eventQueue.append(new SendRPCsToBrokersEvent());
-                    break;
-                case DUAL_WRITE:
-                    // Nothing to do in the PollEvent. If there's metadata change, we use
-                    // MetadataChange event to drive the writes to Zookeeper.
-                    break;
-            }
-
-            // Poll again after some time
-            long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
-            eventQueue.scheduleDeferred(
-                "poll",
-                new EventQueue.DeadlineFunction(deadline),
-                new PollEvent());
-        }
-    }
-
     /**
      * An event generated by a call to {@link MetadataPublisher#onControllerChange}. This will not be called until
      * this class is registered with {@link org.apache.kafka.image.loader.MetadataLoader}. The registration happens
@@ -434,7 +385,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         @Override
         public void run() throws Exception {
             // We can either be the active controller or just resigned from being the controller.
-            KRaftMigrationDriver.this.leaderAndEpoch = leaderAndEpoch;
             boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
 
             if (!isActive) {
@@ -458,6 +408,86 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
+    class MetadataChangeEvent extends MigrationEvent {
+        private final MetadataDelta delta;
+        private final MetadataImage image;
+        private final MetadataProvenance provenance;
+        private final boolean isSnapshot;
+        private final Consumer<Throwable> completionHandler;
+
+        MetadataChangeEvent(
+                MetadataDelta delta,
+                MetadataImage image,
+                MetadataProvenance provenance,
+                boolean isSnapshot,
+                Consumer<Throwable> completionHandler
+        ) {
+            this.delta = delta;
+            this.image = image;
+            this.provenance = provenance;
+            this.isSnapshot = isSnapshot;
+            this.completionHandler = completionHandler;
+        }
+
+        @Override
+        public void run() throws Exception {
+            KRaftMigrationDriver.this.firstPublish = true;
+            MetadataImage prevImage = KRaftMigrationDriver.this.image;
+            KRaftMigrationDriver.this.image = image;
+            String metadataType = isSnapshot ? "snapshot" : "delta";
+
+            if (!migrationState.allowDualWrite()) {
+                log.trace("Received metadata {}, but the controller is not in dual-write " +
+                        "mode. Ignoring the change to be replicated to Zookeeper", metadataType);
+                completionHandler.accept(null);
+                return;
+            }
+
+            if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
+                log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
+                completionHandler.accept(null);
+                return;
+            }
+
+            Map<String, Integer> dualWriteCounts = new TreeMap<>();
+            if (isSnapshot) {
+                zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
+                        dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+            } else {
+                zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
+                        dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+            }
+            if (dualWriteCounts.isEmpty()) {
+                log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta");
+            } else {
+                log.debug("Made the following ZK writes when handling KRaft {}: {}", isSnapshot ? "snapshot" : "delta", dualWriteCounts);
+            }
+
+            // Persist the offset of the metadata that was written to ZK
+            ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
+                    image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
+            applyMigrationOperation("Updating ZK migration state after " + metadataType,
+                    state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
+
+            // TODO: Unhappy path: Probably relinquish leadership and let new controller
+            //  retry the write?
+            if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
+                log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
+                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch());
+            } else {
+                log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
+            }
+
+            completionHandler.accept(null);
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            completionHandler.accept(e);
+            super.handleException(e);
+        }
+    }
+
     class WaitForControllerQuorumEvent extends MigrationEvent {
 
         @Override
@@ -503,6 +533,23 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
+    class WaitForZkBrokersEvent extends MigrationEvent {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case WAIT_FOR_BROKERS:
+                    if (areZkBrokersReadyForMigration()) {
+                        log.debug("Zk brokers are registered and ready for migration");
+                        transitionTo(MigrationDriverState.BECOME_CONTROLLER);
+                    }
+                    break;
+                default:
+                    // Ignore the event as we're not in the appropriate state anymore.
+                    break;
+            }
+        }
+    }
+
     class BecomeZkControllerEvent extends MigrationEvent {
         @Override
         public void run() throws Exception {
@@ -521,23 +568,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
-    class WaitForZkBrokersEvent extends MigrationEvent {
-        @Override
-        public void run() throws Exception {
-            switch (migrationState) {
-                case WAIT_FOR_BROKERS:
-                    if (areZkBrokersReadyForMigration()) {
-                        log.debug("Zk brokers are registered and ready for migration");
-                        transitionTo(MigrationDriverState.BECOME_CONTROLLER);
-                    }
-                    break;
-                default:
-                    // Ignore the event as we're not in the appropriate state anymore.
-                    break;
-            }
-        }
-    }
-
     class MigrateMetadataEvent extends MigrationEvent {
         @Override
         public void run() throws Exception {
@@ -590,23 +620,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
-    static KRaftMigrationOperationConsumer countingOperationConsumer(
-        Map<String, Integer> dualWriteCounts,
-        BiConsumer<String, KRaftMigrationOperation> operationConsumer
-    ) {
-        return (opType, logMsg, operation) -> {
-            dualWriteCounts.compute(opType, (key, value) -> {
-                if (value == null) {
-                    return 1;
-                } else {
-                    return value + 1;
-                }
-            });
-            operationConsumer.accept(logMsg, operation);
-        };
-    }
-
-
     class SyncKRaftMetadataEvent extends MigrationEvent {
         @Override
         public void run() throws Exception {
@@ -614,7 +627,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 log.info("Performing a full metadata sync from KRaft to ZK.");
                 Map<String, Integer> dualWriteCounts = new TreeMap<>();
                 zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
-                    dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+                        dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
                 log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts);
                 transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
             }
@@ -629,95 +642,59 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             if (migrationState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
                 if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
                     log.trace("Sending RPCs to broker before moving to dual-write mode using " +
-                        "at offset and epoch {}", image.highestOffsetAndEpoch());
+                            "at offset and epoch {}", image.highestOffsetAndEpoch());
                     propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
                     // Migration leadership state doesn't change since we're not doing any Zk writes.
                     transitionTo(MigrationDriverState.DUAL_WRITE);
                 } else {
                     log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
-                        migrationLeadershipState.offsetAndEpoch());
+                            migrationLeadershipState.offsetAndEpoch());
                 }
             }
         }
     }
 
-    class MetadataChangeEvent extends MigrationEvent {
-        private final MetadataDelta delta;
-        private final MetadataImage image;
-        private final MetadataProvenance provenance;
-        private final boolean isSnapshot;
-        private final Consumer<Throwable> completionHandler;
-
-        MetadataChangeEvent(
-            MetadataDelta delta,
-            MetadataImage image,
-            MetadataProvenance provenance,
-            boolean isSnapshot,
-            Consumer<Throwable> completionHandler
-        ) {
-            this.delta = delta;
-            this.image = image;
-            this.provenance = provenance;
-            this.isSnapshot = isSnapshot;
-            this.completionHandler = completionHandler;
-        }
-
+    class PollEvent extends MigrationEvent {
         @Override
         public void run() throws Exception {
-            KRaftMigrationDriver.this.firstPublish = true;
-            MetadataImage prevImage = KRaftMigrationDriver.this.image;
-            KRaftMigrationDriver.this.image = image;
-            String metadataType = isSnapshot ? "snapshot" : "delta";
-
-            if (!migrationState.allowDualWrite()) {
-                log.trace("Received metadata {}, but the controller is not in dual-write " +
-                    "mode. Ignoring the change to be replicated to Zookeeper", metadataType);
-                completionHandler.accept(null);
-                return;
-            }
-
-            if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
-                log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
-                completionHandler.accept(null);
-                return;
-            }
-
-            Map<String, Integer> dualWriteCounts = new TreeMap<>();
-            if (isSnapshot) {
-                zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
-                    dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
-            } else {
-                zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
-                    dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
-            }
-            if (dualWriteCounts.isEmpty()) {
-                log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta");
-            } else {
-                log.debug("Made the following ZK writes when handling KRaft {}: {}", isSnapshot ? "snapshot" : "delta", dualWriteCounts);
-            }
-
-            // Persist the offset of the metadata that was written to ZK
-            ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
-                image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
-            applyMigrationOperation("Updating ZK migration state after " + metadataType,
-                state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
-
-            // TODO: Unhappy path: Probably relinquish leadership and let new controller
-            //  retry the write?
-            if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
-                log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
-                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch());
-            } else {
-                log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    recoverMigrationStateFromZK();
+                    break;
+                case INACTIVE:
+                    // Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent
+                    // tells informs us that we are the leader.
+                    break;
+                case WAIT_FOR_CONTROLLER_QUORUM:
+                    eventQueue.append(new WaitForControllerQuorumEvent());
+                    break;
+                case WAIT_FOR_BROKERS:
+                    eventQueue.append(new WaitForZkBrokersEvent());
+                    break;
+                case BECOME_CONTROLLER:
+                    eventQueue.append(new BecomeZkControllerEvent());
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case SYNC_KRAFT_TO_ZK:
+                    eventQueue.append(new SyncKRaftMetadataEvent());
+                    break;
+                case KRAFT_CONTROLLER_TO_BROKER_COMM:
+                    eventQueue.append(new SendRPCsToBrokersEvent());
+                    break;
+                case DUAL_WRITE:
+                    // Nothing to do in the PollEvent. If there's metadata change, we use
+                    // MetadataChange event to drive the writes to Zookeeper.
+                    break;
             }
 
-            completionHandler.accept(null);
-        }
-
-        @Override
-        public void handleException(Throwable e) {
-            completionHandler.accept(e);
-            super.handleException(e);
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
+            eventQueue.scheduleDeferred(
+                    "poll",
+                    new EventQueue.DeadlineFunction(deadline),
+                    new PollEvent());
         }
     }
 
@@ -745,4 +722,20 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }).collect(Collectors.joining(","));
         return "[" + batchString + "]";
     }
+
+    static KRaftMigrationOperationConsumer countingOperationConsumer(
+        Map<String, Integer> dualWriteCounts,
+        BiConsumer<String, KRaftMigrationOperation> operationConsumer
+    ) {
+        return (opType, logMsg, operation) -> {
+            dualWriteCounts.compute(opType, (key, value) -> {
+                if (value == null) {
+                    return 1;
+                } else {
+                    return value + 1;
+                }
+            });
+            operationConsumer.accept(logMsg, operation);
+        };
+    }
 }