You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/05/24 22:28:11 UTC

[GitHub] [kafka] mumrah opened a new pull request, #13758: KAFKA-15010 ZK migration failover support

mumrah opened a new pull request, #13758:
URL: https://github.com/apache/kafka/pull/13758

   Previously, if the KRaft controller failed over while metadata changes was pending in the KRaftMigrationDriver queue, we would lose those writes to ZK. This patch adds snapshot reconciliation so that the controller can ensure a consistent state with ZK after a failover.
   
   Internally this adds a new state to the KRaftMigrationDriver state machine. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1206199732


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -260,37 +293,37 @@ public void visitScramCredential(String userName, ScramMechanism scramMechanism,
         });
 
         changedNonUserEntities.forEach(entity -> {
-            Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap();
-            operationConsumer.accept("Update client quotas for " + entity, migrationState ->
+            Map<String, Double> quotaMap = getClientQuotaMapForEntity(clientQuotasImage, entity);
+            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + entity, migrationState ->
                 migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, Collections.emptyMap(), migrationState));
         });
 
         changedUsers.forEach(userName -> {
             ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
-            Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap();
+            Map<String, Double> quotaMap = getClientQuotaMapForEntity(clientQuotasImage, entity);

Review Comment:
   There was an NPE here prior to this patch. If the SCRAM credentials changed, but ClientQuotas did not, the `get(entity)` call would return a null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1211796623


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -303,6 +304,14 @@ class ZkMigrationClient(
     new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
   }
 
+  override def readProducerId(): util.Optional[java.lang.Long] = {

Review Comment:
   Oh right, in KRaft when we give out a new block, we persist the next ID in the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210817910


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -566,15 +578,46 @@ public void run() throws Exception {
                 ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch());
-                applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState));
-                transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+                applyMigrationOperation("Finished migrating ZK data to KRaft", state -> zkMigrationClient.setMigrationRecoveryState(newState));
+                transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
             } catch (Throwable t) {
                 zkRecordConsumer.abortMigration();
                 super.handleException(t);
             }
         }
     }
 
+    static KRaftMigrationOperationConsumer countingOperationConsumer(
+        Map<String, Integer> dualWriteCounts,
+        BiConsumer<String, KRaftMigrationOperation> operationConsumer
+    ) {
+        return (opType, logMsg, operation) -> {
+            dualWriteCounts.compute(opType, (key, value) -> {
+                if (value == null) {
+                    return 1;
+                } else {
+                    return value + 1;
+                }
+            });
+            operationConsumer.accept(logMsg, operation);
+        };
+    }
+
+
+    class SyncKRaftMetadataEvent extends MigrationEvent {
+        @Override
+        public void run() throws Exception {
+            if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
+                log.info("Performing a full metadata sync from KRaft to ZK.");
+                Map<String, Integer> dualWriteCounts = new HashMap<>();
+                zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
+                    dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
+                log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts);

Review Comment:
   hmm, this will be printed out unsorted, right? I guess this is a nitpick but it would be better to sort



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah merged pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah merged PR #13758:
URL: https://github.com/apache/kafka/pull/13758


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210810667


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -303,6 +304,14 @@ class ZkMigrationClient(
     new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
   }
 
+  override def readProducerId(): util.Optional[java.lang.Long] = {

Review Comment:
   This doesn't seem to be returning nextProducerId, it's returning the current producer id.
   
   Why not just have `writeProducerId` take a `ProducerIdsBlock` object so we don't have to do a bunch of dubious translation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210813001


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -149,10 +152,9 @@ CompletableFuture<MigrationDriverState> migrationState() {
     }
 
     private void recoverMigrationStateFromZK() {
-        log.info("Recovering migration state from ZK");
-        applyMigrationOperation("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState);
+        applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState);
         String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
-        log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
+        log.info("ZK migration is {}.", maybeDone);

Review Comment:
   I'd really prefer to say something like "Initial ZK load is done" / "Initial ZK load is not done"
   
   We should also change `ZkMigrationLeadershipState.zkMigrationComplete` -> `ZkMigrationLeadershipState.initialZkLoadComplete`
   
   After all, the whole process here is technically "ZK migration" not just the initial load



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210814541


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -566,15 +578,46 @@ public void run() throws Exception {
                 ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch());
-                applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState));
-                transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+                applyMigrationOperation("Finished migrating ZK data to KRaft", state -> zkMigrationClient.setMigrationRecoveryState(newState));

Review Comment:
   Maybe add a comment here about how we always go through the sync kraft -> zk state here, even immediately after we've loaded from zk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org