You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/02/01 21:57:00 UTC

[GitHub] [kafka] mumrah opened a new pull request, #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

mumrah opened a new pull request, #13183:
URL: https://github.com/apache/kafka/pull/13183

   Prior to this patch, when we handle a new MetadataDelta during dual write mode in the KRaft controller, we always send UMR to the ZK brokers. This patch changes the behavior to only send UMR if the broker or topic metadata has changed (since this is all that we send via UMR).
   
   A unit test for KRaftMigrationDriver is added to validate this behavior. 
   
   Some improved error handling and logging in KRaftMigrationDriver is also included.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094928271


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -474,6 +472,7 @@ class MetadataChangeEvent extends MigrationEvent {
         private final MetadataImage image;
         private final MetadataProvenance provenance;
         private final boolean isSnapshot;
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   Thanks, looks good now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094454976


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -474,6 +472,7 @@ class MetadataChangeEvent extends MigrationEvent {
         private final MetadataImage image;
         private final MetadataProvenance provenance;
         private final boolean isSnapshot;
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   If we want to follow this pattern where the tests `KRaftMigrationDriverTest` are signalled when this event is processed, and at the same time avoid the cost of adding a future to every `SendRPCsToBrokersEvent`, `enqueueMetadataChangeEvent` could take a future in argument instead. Prod code would pass an empty/noop future. Alternatively the unit tests could loop until the assertions are verified or a timeout is reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419445476

   @FireBurn, no it hasn't been delayed. We have an early access version coming out in 3.4.0 which will be announced this week. We plan on back-porting the remaining ZK migration work to the 3.4.x line so we can continue releasing without waiting for the next major release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094614887


##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -250,7 +250,7 @@ private void handleEvents() throws InterruptedException {
                             continue;
                         } else if (shuttingDown) {
                             remove(eventContext);
-                            toDeliver = new TimeoutException();
+                            toDeliver = new QueueClosingException();

Review Comment:
   I thought about `RejectedExecutionException`, but wasn't sure if we relied on the `TimeoutException` type somewhere, so I made a subclass of that. If you think it's fine, we'll go with `RejectedExecutionException`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1413948935

   Updates:
   * Factored out the CompletableFuture from the MetadataChangeEvent so it's only used in the unit test
   * Removed QueueClosingException, replaced with RejectedExecutionException, updated unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1101861663


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -274,11 +310,6 @@ public void run() throws Exception {
                 new EventQueue.DeadlineFunction(deadline),
                 new PollEvent());
         }
-

Review Comment:
   The base MigrationEvent will do the error logging. I was just removing some redundant overrides. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1093791731


##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -250,7 +250,7 @@ private void handleEvents() throws InterruptedException {
                             continue;
                         } else if (shuttingDown) {
                             remove(eventContext);
-                            toDeliver = new TimeoutException();
+                            toDeliver = new QueueClosingException();

Review Comment:
   Hmm, I'm not really sure about having a new execption here. Can we just use `RejectedExecutionException` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094617299


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -474,6 +472,7 @@ class MetadataChangeEvent extends MigrationEvent {
         private final MetadataImage image;
         private final MetadataProvenance provenance;
         private final boolean isSnapshot;
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   It's used to avoid having a bunch of wait-for conditions in the tests. Let me try your suggestion @Hangleton



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094922008


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class KRaftMigrationDriverTest {
+    class NoOpRecordConsumer implements ZkRecordConsumer {
+        @Override
+        public void beginMigration() {
+
+        }
+
+        @Override
+        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
+            return null;
+        }
+
+        @Override
+        public OffsetAndEpoch completeMigration() {
+            return new OffsetAndEpoch(100, 1);
+        }
+
+        @Override
+        public void abortMigration() {
+
+        }
+    }
+
+    class CapturingMigrationClient implements MigrationClient {
+
+        private final Set<Integer> brokerIds;
+        public final Map<ConfigResource, Map<String, String>> capturedConfigs = new HashMap<>();
+
+        public CapturingMigrationClient(Set<Integer> brokerIdsInZk) {
+            this.brokerIds = brokerIdsInZk;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
+            return initialState;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState createTopic(
+            String topicName,
+            Uuid topicId,
+            Map<Integer, PartitionRegistration> topicPartitions,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState updateTopicPartitions(
+            Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeConfigs(
+            ConfigResource configResource,
+            Map<String, String> configMap,
+            ZkMigrationLeadershipState state
+        ) {
+            capturedConfigs.computeIfAbsent(configResource, __ -> new HashMap<>()).putAll(configMap);
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeClientQuotas(
+            Map<String, String> clientQuotaEntity,
+            Map<String, Double> quotas,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeProducerId(
+            long nextProducerId,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public void readAllMetadata(
+            Consumer<List<ApiMessageAndVersion>> batchConsumer,
+            Consumer<Integer> brokerIdConsumer
+        ) {
+
+        }
+
+        @Override
+        public Set<Integer> readBrokerIds() {
+            return brokerIds;
+        }
+
+        @Override
+        public Set<Integer> readBrokerIdsFromTopicAssignments() {
+            return brokerIds;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(
+            MetadataDelta delta,
+            MetadataImage image,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+    }
+
+    class CountingMetadataPropagator implements LegacyPropagator {
+
+        public int deltas = 0;
+        public int images = 0;
+
+        @Override
+        public void startup() {
+
+        }
+
+        @Override
+        public void shutdown() {
+
+        }
+
+        @Override
+        public void publishMetadata(MetadataImage image) {
+
+        }
+
+        @Override
+        public void sendRPCsToBrokersFromMetadataDelta(
+            MetadataDelta delta,
+            MetadataImage image,
+            int zkControllerEpoch
+        ) {
+            deltas += 1;
+        }
+
+        @Override
+        public void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch) {
+            images += 1;
+        }
+
+        @Override
+        public void clear() {
+
+        }
+
+        @Override
+        public void setMetadataVersion(MetadataVersion metadataVersion) {
+
+        }
+    }
+
+    RegisterBrokerRecord zkBrokerRecord(int id) {
+        RegisterBrokerRecord record = new RegisterBrokerRecord();
+        record.setBrokerId(id);
+        record.setIsMigratingZkBroker(true);
+        record.setFenced(false);
+        return record;
+    }
+
+    /**
+     * Enqueues a metadata change event with the migration driver and returns a future that can be waited on in
+     * the test code. The future will complete once the metadata change event executes completely.
+     */
+    CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(
+        KRaftMigrationDriver driver,
+        MetadataDelta delta,
+        MetadataImage newImage,
+        MetadataProvenance provenance
+    ) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Consumer<Throwable> completionHandler = ex -> {
+            if (ex == null) {
+                future.complete(null);
+            } else {
+                future.completeExceptionally(ex);
+            }
+        };
+
+        driver.enqueueMetadataChangeEvent(delta, newImage, provenance, false, completionHandler);
+        return future;
+    }
+
+    @Test
+    public void testOnlySendNeededRPCsToBrokers() throws Exception {
+        // KAFKA-14668 Don't send RPCs to brokers for every metadata change, only when broker or topics change.
+        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)));
+        KRaftMigrationDriver driver = new KRaftMigrationDriver(
+            3000,
+            new NoOpRecordConsumer(),
+            migrationClient,
+            metadataPropagator,
+            metadataPublisher -> { },
+            new MockFaultHandler("test")
+        );
+
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        driver.start();
+        delta.replay(zkBrokerRecord(1));
+        delta.replay(zkBrokerRecord(2));
+        delta.replay(zkBrokerRecord(3));
+        MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+        image = delta.apply(provenance);
+
+        // Publish a delta with this node (3000) as the leader
+        driver.publishLogDelta(delta, image, new LogDeltaManifest(provenance,
+            new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
+
+        TestUtils.waitForCondition(() -> driver.migrationState().get(10, TimeUnit.SECONDS).equals(MigrationState.DUAL_WRITE),
+            "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
+
+        Assertions.assertEquals(1, metadataPropagator.images);
+        Assertions.assertEquals(0, metadataPropagator.deltas);
+
+        delta = new MetadataDelta(image);
+        delta.replay(new ConfigRecord()
+            .setResourceType(ConfigResource.Type.BROKER.id())
+            .setResourceName("1")
+            .setName("foo")
+            .setValue("bar"));
+        provenance = new MetadataProvenance(120, 1, 2);
+        image = delta.apply(provenance);
+        enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(10, TimeUnit.SECONDS);

Review Comment:
   maybe 1 minute as a timeout? The Jenkins environment can get wacky so short timeouts are almost never a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1093789522


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -474,6 +472,7 @@ class MetadataChangeEvent extends MigrationEvent {
         private final MetadataImage image;
         private final MetadataProvenance provenance;
         private final boolean isSnapshot;
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   do we need this? it's expensive, and I can find 0 uses of it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] FireBurn commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "FireBurn (via GitHub)" <gi...@apache.org>.
FireBurn commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419452307

   Fab, I've just built 3.4.0 from the tag, but couldn't find the migration docs, so worried it had been delayed to 3.4.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] akhileshchg commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "akhileshchg (via GitHub)" <gi...@apache.org>.
akhileshchg commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1101829449


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -274,11 +310,6 @@ public void run() throws Exception {
                 new EventQueue.DeadlineFunction(deadline),
                 new PollEvent());
         }
-

Review Comment:
   Why are we not handling exceptions in other events?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419529065

   Gotcha, yea the docs are still in progress. We'll have something published soon (hopefully before the release announcement) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah merged pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah merged PR #13183:
URL: https://github.com/apache/kafka/pull/13183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094915210


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class KRaftMigrationDriverTest {
+    class NoOpRecordConsumer implements ZkRecordConsumer {
+        @Override
+        public void beginMigration() {
+
+        }
+
+        @Override
+        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
+            return null;
+        }
+
+        @Override
+        public OffsetAndEpoch completeMigration() {
+            return new OffsetAndEpoch(100, 1);
+        }
+
+        @Override
+        public void abortMigration() {
+
+        }
+    }
+
+    class CapturingMigrationClient implements MigrationClient {
+
+        private final Set<Integer> brokerIds;
+        public final Map<ConfigResource, Map<String, String>> capturedConfigs = new HashMap<>();
+
+        public CapturingMigrationClient(Set<Integer> brokerIdsInZk) {
+            this.brokerIds = brokerIdsInZk;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
+            return initialState;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState createTopic(
+            String topicName,
+            Uuid topicId,
+            Map<Integer, PartitionRegistration> topicPartitions,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState updateTopicPartitions(
+            Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeConfigs(
+            ConfigResource configResource,
+            Map<String, String> configMap,
+            ZkMigrationLeadershipState state
+        ) {
+            capturedConfigs.computeIfAbsent(configResource, __ -> new HashMap<>()).putAll(configMap);
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeClientQuotas(
+            Map<String, String> clientQuotaEntity,
+            Map<String, Double> quotas,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeProducerId(
+            long nextProducerId,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+
+        @Override
+        public void readAllMetadata(
+            Consumer<List<ApiMessageAndVersion>> batchConsumer,
+            Consumer<Integer> brokerIdConsumer
+        ) {
+
+        }
+
+        @Override
+        public Set<Integer> readBrokerIds() {
+            return brokerIds;
+        }
+
+        @Override
+        public Set<Integer> readBrokerIdsFromTopicAssignments() {
+            return brokerIds;
+        }
+
+        @Override
+        public ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(
+            MetadataDelta delta,
+            MetadataImage image,
+            ZkMigrationLeadershipState state
+        ) {
+            return state;
+        }
+    }
+
+    class CountingMetadataPropagator implements LegacyPropagator {
+
+        public int deltas = 0;
+        public int images = 0;
+
+        @Override
+        public void startup() {
+
+        }
+
+        @Override
+        public void shutdown() {
+
+        }
+
+        @Override
+        public void publishMetadata(MetadataImage image) {
+
+        }
+
+        @Override
+        public void sendRPCsToBrokersFromMetadataDelta(
+            MetadataDelta delta,
+            MetadataImage image,
+            int zkControllerEpoch
+        ) {
+            deltas += 1;
+        }
+
+        @Override
+        public void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch) {
+            images += 1;
+        }
+
+        @Override
+        public void clear() {
+
+        }
+
+        @Override
+        public void setMetadataVersion(MetadataVersion metadataVersion) {
+
+        }
+    }
+
+    RegisterBrokerRecord zkBrokerRecord(int id) {
+        RegisterBrokerRecord record = new RegisterBrokerRecord();
+        record.setBrokerId(id);
+        record.setIsMigratingZkBroker(true);
+        record.setFenced(false);
+        return record;
+    }
+
+    /**
+     * Enqueues a metadata change event with the migration driver and returns a future that can be waited on in
+     * the test code. The future will complete once the metadata change event executes completely.
+     */
+    CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(
+        KRaftMigrationDriver driver,
+        MetadataDelta delta,
+        MetadataImage newImage,
+        MetadataProvenance provenance
+    ) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Consumer<Throwable> completionHandler = ex -> {
+            if (ex == null) {
+                future.complete(null);
+            } else {
+                future.completeExceptionally(ex);
+            }
+        };
+
+        driver.enqueueMetadataChangeEvent(delta, newImage, provenance, false, completionHandler);
+        return future;
+    }
+
+    @Test
+    public void testOnlySendNeededRPCsToBrokers() throws Exception {
+        // KAFKA-14668 Don't send RPCs to brokers for every metadata change, only when broker or topics change.

Review Comment:
   Maybe a JavaDoc that explains "This is a regression test for KAFKA-14668..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094930020


##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -250,7 +250,7 @@ private void handleEvents() throws InterruptedException {
                             continue;
                         } else if (shuttingDown) {
                             remove(eventContext);
-                            toDeliver = new TimeoutException();
+                            toDeliver = new QueueClosingException();

Review Comment:
   I think `RejectedExecutionException` really is more appropriate. If you tried to schedule (or reschedule) a deferred task and the queue was shut down, you'd get `RejectedExecutionException` already. So you already have to catch REE and treat it this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] FireBurn commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "FireBurn (via GitHub)" <gi...@apache.org>.
FireBurn commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419426113

   Has ZK migration to Kraft been delayed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1094918202


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -538,12 +553,23 @@ public void run() throws Exception {
 
                 // TODO: Unhappy path: Probably relinquish leadership and let new controller
                 //  retry the write?
-                log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
-                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
-                        migrationLeadershipState.zkControllerEpoch());
+                if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
+                    log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
+                    propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
+                            migrationLeadershipState.zkControllerEpoch());
+                } else {
+                    log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata changes", metadataType);

Review Comment:
   nitpick: perhaps "since no relevant metadata has changd"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1093792394


##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -250,7 +250,7 @@ private void handleEvents() throws InterruptedException {
                             continue;
                         } else if (shuttingDown) {
                             remove(eventContext);
-                            toDeliver = new TimeoutException();
+                            toDeliver = new QueueClosingException();

Review Comment:
   Also we will need a test in `KafkaEventQueueTest` if you want to do this. I do wonder if this should be a separate PR ... queue changes have a way of being big. But I guess let's see if it impacts any other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13183:
URL: https://github.com/apache/kafka/pull/13183#discussion_r1093787328


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -106,6 +109,13 @@ public void shutdown() throws InterruptedException {
         eventQueue.close();
     }
 
+    // Visible for testing
+    Future<MigrationState> migrationState() {

Review Comment:
   Would be better as if the return type were `CompletableFuture`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org