You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2024/02/26 23:57:18 UTC

(kafka) branch trunk updated: KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster (#15385)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ddfcc333f8d KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster (#15385)
ddfcc333f8d is described below

commit ddfcc333f8d1c2f4ab0a88abf6d664153d6e82fd
Author: Mayank Shekhar Narula <42...@users.noreply.github.com>
AuthorDate: Mon Feb 26 23:57:11 2024 +0000

    KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster (#15385)
    
    Add test for concurrently updatingMetadata and fetching snapshot/cluster
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
    
    Co-authored-by: Zhifeng Chen <er...@gmail.com>
---
 .../org/apache/kafka/clients/MetadataTest.java     | 106 +++++++++++++++++++++
 1 file changed, 106 insertions(+)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 600fc23ecb9..e6db9685eb5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.clients;
 
+import java.util.OptionalInt;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ClusterResourceListener;
 import org.apache.kafka.common.Node;
@@ -1260,6 +1265,107 @@ public class MetadataTest {
         Mockito.reset(mockListener);
     }
 
+    /**
+     * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e.
+     * snapshot & cluster contain the relevant updates.
+     */
+    @Test
+    public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException {
+        Time time = new MockTime();
+        metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+        // Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100.
+        int oldNodeCount = 10;
+        String topic1 = "test_topic1";
+        String topic2 = "test_topic2";
+        TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        int oldPartitionCount = 1;
+        topicPartitionCounts.put(topic1, oldPartitionCount);
+        topicPartitionCounts.put(topic2, oldPartitionCount);
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(topic1, Uuid.randomUuid());
+        topicIds.put(topic2, Uuid.randomUuid());
+        int oldLeaderEpoch = 100;
+        MetadataResponse metadataResponse =
+            RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+        Cluster cluster = metadata.fetch();
+        // Validate metadata snapshot & cluster are setup as expected.
+        assertEquals(cluster, snapshot.cluster());
+        assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+        assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1));
+        assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2));
+        assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0));
+
+        // Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster.
+        // Metadata will be updated with higher # of nodes, partition-counts, leader-epoch.
+        int numThreads = 6;
+        ExecutorService service = Executors.newFixedThreadPool(numThreads);
+        CountDownLatch allThreadsDoneLatch = new CountDownLatch(numThreads);
+        CountDownLatch atleastMetadataUpdatedOnceLatch = new CountDownLatch(1);
+        AtomicReference<MetadataSnapshot> newSnapshot = new AtomicReference<>();
+        AtomicReference<Cluster> newCluster = new AtomicReference<>();
+        for (int i = 0; i < numThreads; i++) {
+            final int id = i + 1;
+            service.execute(() -> {
+                if (id % 2 == 0) { // Thread to update metadata.
+                    String oldClusterId = "clusterId";
+                    int nNodes = oldNodeCount + id;
+                    Map<String, Integer> newTopicPartitionCounts = new HashMap<>();
+                    newTopicPartitionCounts.put(topic1, oldPartitionCount + id);
+                    newTopicPartitionCounts.put(topic2, oldPartitionCount + id);
+                    MetadataResponse newMetadataResponse =
+                        RequestTestUtils.metadataUpdateWithIds(oldClusterId, nNodes, Collections.emptyMap(), newTopicPartitionCounts, _tp -> oldLeaderEpoch + id, topicIds);
+                    metadata.updateWithCurrentRequestVersion(newMetadataResponse, true, time.milliseconds());
+                    atleastMetadataUpdatedOnceLatch.countDown();
+                } else { // Thread to read metadata snapshot, once its updated
+                    try {
+                        if (!atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)) {
+                            assertFalse(true, "Test had to wait more than 5 minutes, something went wrong.");
+                        }
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    newSnapshot.set(metadata.fetchMetadataSnapshot());
+                    newCluster.set(metadata.fetch());
+                }
+                allThreadsDoneLatch.countDown();
+            });
+        }
+        if (!allThreadsDoneLatch.await(5, TimeUnit.MINUTES)) {
+            assertFalse(true, "Test had to wait more than 5 minutes, something went wrong.");
+        }
+
+        // Validate new snapshot is upto-date. And has higher partition counts, nodes & leader epoch than earlier.
+        {
+            int newNodeCount = newSnapshot.get().cluster().nodes().size();
+            assertTrue(oldNodeCount < newNodeCount, "Unexpected value " + newNodeCount);
+            int newPartitionCountTopic1 = newSnapshot.get().cluster().partitionCountForTopic(topic1);
+            assertTrue(oldPartitionCount < newPartitionCountTopic1, "Unexpected value " + newPartitionCountTopic1);
+            int newPartitionCountTopic2 = newSnapshot.get().cluster().partitionCountForTopic(topic2);
+            assertTrue(oldPartitionCount < newPartitionCountTopic2, "Unexpected value " + newPartitionCountTopic2);
+            int newLeaderEpoch = newSnapshot.get().leaderEpochFor(topic1Part0).getAsInt();
+            assertTrue(oldLeaderEpoch < newLeaderEpoch, "Unexpected value " + newLeaderEpoch);
+        }
+
+        // Validate new cluster is upto-date. And has higher partition counts, nodes than earlier.
+        {
+            int newNodeCount = newCluster.get().nodes().size();
+            assertTrue(oldNodeCount < newNodeCount, "Unexpected value " + newNodeCount);
+            int newPartitionCountTopic1 = newCluster.get().partitionCountForTopic(topic1);
+            assertTrue(oldPartitionCount < newPartitionCountTopic1, "Unexpected value " + newPartitionCountTopic1);
+            int newPartitionCountTopic2 = newCluster.get()
+                .partitionCountForTopic(topic2);
+            assertTrue(oldPartitionCount < newPartitionCountTopic2, "Unexpected value " + newPartitionCountTopic2);
+        }
+
+        service.shutdown();
+        // Executor service should down much quickly, as all tasks are finished at this point.
+        assertTrue(service.awaitTermination(60, TimeUnit.SECONDS));
+    }
+
     /**
      * For testUpdatePartially, validates that updatedMetadata is matching expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more.
      */