You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "showuon (via GitHub)" <gi...@apache.org> on 2023/05/02 12:11:22 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

showuon commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1182450255


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##########
@@ -534,5 +421,157 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
                 "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
         }
     }
-}
 
+    @FunctionalInterface
+    interface TopicDualWriteVerifier {
+        void verify(
+            KRaftMigrationDriver driver,
+            CapturingTopicMigrationClient topicClient,
+            CapturingConfigMigrationClient configClient
+        ) throws Exception;
+    }
+
+    public void setupTopicDualWrite(TopicDualWriteVerifier verifier) throws Exception {
+        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                IMAGE1.topicsByName().forEach((topicName, topicImage) -> {
+                    Map<Integer, List<Integer>> assignment = new HashMap<>();
+                    topicImage.partitions().forEach((partitionId, partitionRegistration) ->
+                        assignment.put(partitionId, IntStream.of(partitionRegistration.replicas).boxed().collect(Collectors.toList()))
+                    );
+                    visitor.visitTopic(topicName, topicImage.id(), assignment);
+
+                    topicImage.partitions().forEach((partitionId, partitionRegistration) ->
+                        visitor.visitPartition(new TopicIdPartition(topicImage.id(), new TopicPartition(topicName, partitionId)), partitionRegistration)
+                    );
+                });
+            }
+        };
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0, 1, 2, 3, 4, 5)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .build();
+
+        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
+            3000,
+            new NoOpRecordConsumer(),
+            migrationClient,
+            metadataPropagator,
+            metadataPublisher -> { },
+            new MockFaultHandler("test"),
+            quorumFeatures,
+            mockTime
+        )) {
+            verifier.verify(driver, topicClient, configClient);
+        }
+    }
+
+    @Test
+    public void testTopicDualWriteSnapshot() throws Exception {
+        setupTopicDualWrite((driver, topicClient, configClient) -> {
+            MetadataImage image = new MetadataImage(
+                MetadataProvenance.EMPTY,
+                FeaturesImage.EMPTY,
+                ClusterImage.EMPTY,
+                IMAGE1,
+                ConfigurationsImage.EMPTY,
+                ClientQuotasImage.EMPTY,
+                ProducerIdsImage.EMPTY,
+                AclsImage.EMPTY,
+                ScramImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta(image);
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(0));
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(zkBrokerRecord(4));
+            delta.replay(zkBrokerRecord(5));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            // Publish a delta with this node (3000) as the leader
+            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+            driver.onControllerChange(newLeader);
+            driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+            // Wait for migration
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+
+            // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
+            provenance = new MetadataProvenance(200, 1, 1);
+            delta = new MetadataDelta(image);
+            RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
+            image = delta.apply(provenance);
+            driver.onMetadataUpdate(delta, image, new SnapshotManifest(provenance, 100));
+            driver.migrationState().get(1, TimeUnit.MINUTES);

Review Comment:
   What's the purpose we get migrationState here? I guess that's because we want to make sure all the events are handled successfully, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org