You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2024/02/26 23:57:18 UTC
(kafka) branch trunk updated: KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster (#15385)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddfcc333f8d KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster (#15385)
ddfcc333f8d is described below
commit ddfcc333f8d1c2f4ab0a88abf6d664153d6e82fd
Author: Mayank Shekhar Narula <42...@users.noreply.github.com>
AuthorDate: Mon Feb 26 23:57:11 2024 +0000
KAFKA-16226 Add test for concurrently updatingMetadata and fetching snapshot/cluster (#15385)
Add test for concurrently updatingMetadata and fetching snapshot/cluster
Reviewers: Jason Gustafson <ja...@confluent.io>
Co-authored-by: Zhifeng Chen <er...@gmail.com>
---
.../org/apache/kafka/clients/MetadataTest.java | 106 +++++++++++++++++++++
1 file changed, 106 insertions(+)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 600fc23ecb9..e6db9685eb5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -16,6 +16,11 @@
*/
package org.apache.kafka.clients;
+import java.util.OptionalInt;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.Node;
@@ -1260,6 +1265,107 @@ public class MetadataTest {
Mockito.reset(mockListener);
}
+ /**
+ * Test that concurrently updating Metadata, and fetching the corresponding MetadataSnapshot & Cluster work as expected, i.e.
+ * snapshot & cluster contain the relevant updates.
+ */
+ @Test
+ public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws InterruptedException {
+ Time time = new MockTime();
+ metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+
+ // Setup metadata with 10 nodes, 2 topics, topic1 & 2, both to be retained in the update. Both will have leader-epoch 100.
+ int oldNodeCount = 10;
+ String topic1 = "test_topic1";
+ String topic2 = "test_topic2";
+ TopicPartition topic1Part0 = new TopicPartition(topic1, 0);
+ Map<String, Integer> topicPartitionCounts = new HashMap<>();
+ int oldPartitionCount = 1;
+ topicPartitionCounts.put(topic1, oldPartitionCount);
+ topicPartitionCounts.put(topic2, oldPartitionCount);
+ Map<String, Uuid> topicIds = new HashMap<>();
+ topicIds.put(topic1, Uuid.randomUuid());
+ topicIds.put(topic2, Uuid.randomUuid());
+ int oldLeaderEpoch = 100;
+ MetadataResponse metadataResponse =
+ RequestTestUtils.metadataUpdateWithIds("cluster", oldNodeCount, Collections.emptyMap(), topicPartitionCounts, _tp -> oldLeaderEpoch, topicIds);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+ MetadataSnapshot snapshot = metadata.fetchMetadataSnapshot();
+ Cluster cluster = metadata.fetch();
+ // Validate metadata snapshot & cluster are setup as expected.
+ assertEquals(cluster, snapshot.cluster());
+ assertEquals(oldNodeCount, snapshot.cluster().nodes().size());
+ assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic1));
+ assertEquals(oldPartitionCount, snapshot.cluster().partitionCountForTopic(topic2));
+ assertEquals(OptionalInt.of(oldLeaderEpoch), snapshot.leaderEpochFor(topic1Part0));
+
+ // Setup 6 threads, where 3 are updating metadata & 3 are reading snapshot/cluster.
+ // Metadata will be updated with higher # of nodes, partition-counts, leader-epoch.
+ int numThreads = 6;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch allThreadsDoneLatch = new CountDownLatch(numThreads);
+ CountDownLatch atleastMetadataUpdatedOnceLatch = new CountDownLatch(1);
+ AtomicReference<MetadataSnapshot> newSnapshot = new AtomicReference<>();
+ AtomicReference<Cluster> newCluster = new AtomicReference<>();
+ for (int i = 0; i < numThreads; i++) {
+ final int id = i + 1;
+ service.execute(() -> {
+ if (id % 2 == 0) { // Thread to update metadata.
+ String oldClusterId = "clusterId";
+ int nNodes = oldNodeCount + id;
+ Map<String, Integer> newTopicPartitionCounts = new HashMap<>();
+ newTopicPartitionCounts.put(topic1, oldPartitionCount + id);
+ newTopicPartitionCounts.put(topic2, oldPartitionCount + id);
+ MetadataResponse newMetadataResponse =
+ RequestTestUtils.metadataUpdateWithIds(oldClusterId, nNodes, Collections.emptyMap(), newTopicPartitionCounts, _tp -> oldLeaderEpoch + id, topicIds);
+ metadata.updateWithCurrentRequestVersion(newMetadataResponse, true, time.milliseconds());
+ atleastMetadataUpdatedOnceLatch.countDown();
+ } else { // Thread to read metadata snapshot, once its updated
+ try {
+ if (!atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)) {
+ assertFalse(true, "Test had to wait more than 5 minutes, something went wrong.");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ newSnapshot.set(metadata.fetchMetadataSnapshot());
+ newCluster.set(metadata.fetch());
+ }
+ allThreadsDoneLatch.countDown();
+ });
+ }
+ if (!allThreadsDoneLatch.await(5, TimeUnit.MINUTES)) {
+ assertFalse(true, "Test had to wait more than 5 minutes, something went wrong.");
+ }
+
+ // Validate new snapshot is upto-date. And has higher partition counts, nodes & leader epoch than earlier.
+ {
+ int newNodeCount = newSnapshot.get().cluster().nodes().size();
+ assertTrue(oldNodeCount < newNodeCount, "Unexpected value " + newNodeCount);
+ int newPartitionCountTopic1 = newSnapshot.get().cluster().partitionCountForTopic(topic1);
+ assertTrue(oldPartitionCount < newPartitionCountTopic1, "Unexpected value " + newPartitionCountTopic1);
+ int newPartitionCountTopic2 = newSnapshot.get().cluster().partitionCountForTopic(topic2);
+ assertTrue(oldPartitionCount < newPartitionCountTopic2, "Unexpected value " + newPartitionCountTopic2);
+ int newLeaderEpoch = newSnapshot.get().leaderEpochFor(topic1Part0).getAsInt();
+ assertTrue(oldLeaderEpoch < newLeaderEpoch, "Unexpected value " + newLeaderEpoch);
+ }
+
+ // Validate new cluster is upto-date. And has higher partition counts, nodes than earlier.
+ {
+ int newNodeCount = newCluster.get().nodes().size();
+ assertTrue(oldNodeCount < newNodeCount, "Unexpected value " + newNodeCount);
+ int newPartitionCountTopic1 = newCluster.get().partitionCountForTopic(topic1);
+ assertTrue(oldPartitionCount < newPartitionCountTopic1, "Unexpected value " + newPartitionCountTopic1);
+ int newPartitionCountTopic2 = newCluster.get()
+ .partitionCountForTopic(topic2);
+ assertTrue(oldPartitionCount < newPartitionCountTopic2, "Unexpected value " + newPartitionCountTopic2);
+ }
+
+ service.shutdown();
+ // Executor service should down much quickly, as all tasks are finished at this point.
+ assertTrue(service.awaitTermination(60, TimeUnit.SECONDS));
+ }
+
/**
* For testUpdatePartially, validates that updatedMetadata is matching expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more.
*/