You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/07/25 13:30:56 UTC

[kafka] branch trunk updated: KAFKA-14093: Use single-worker Connect cluster when testing fenced leader recovery (#12433)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71d225d7c2 KAFKA-14093: Use single-worker Connect cluster when testing fenced leader recovery (#12433)
71d225d7c2 is described below

commit 71d225d7c2489f4921e45bbc9d1f182a8419087e
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Mon Jul 25 09:30:38 2022 -0400

    KAFKA-14093: Use single-worker Connect cluster when testing fenced leader recovery (#12433)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>

, Tom Bentley <tb...@redhat.com>
---
 .../integration/ExactlyOnceSourceIntegrationTest.java   | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 25a419ed8f..90fcaa8a21 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -76,6 +76,7 @@ import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
 import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
 import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG;
@@ -466,6 +467,7 @@ public class ExactlyOnceSourceIntegrationTest {
      */
     @Test
     public void testFencedLeaderRecovery() throws Exception {
+        connectBuilder.numWorkers(1);
         // Much slower offset commit interval; should never be triggered during this test
         workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000");
         startConnect();
@@ -494,7 +496,10 @@ public class ExactlyOnceSourceIntegrationTest {
         assertEquals(404, connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus());
 
         // fence out the leader of the cluster
-        Producer<?, ?> zombieLeader = transactionalProducer(DistributedConfig.transactionalProducerId(CLUSTER_GROUP_ID));
+        Producer<?, ?> zombieLeader = transactionalProducer(
+                "simulated-zombie-leader",
+                DistributedConfig.transactionalProducerId(CLUSTER_GROUP_ID)
+        );
         zombieLeader.initTransactions();
         zombieLeader.close();
 
@@ -1030,9 +1035,10 @@ public class ExactlyOnceSourceIntegrationTest {
 
         // create a collection of producers that simulate the producers used for the existing tasks
         List<KafkaProducer<byte[], byte[]>> producers = IntStream.range(0, currentNumTasks)
-                .mapToObj(i -> Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i))
-                .map(this::transactionalProducer)
-                .collect(Collectors.toList());
+                .mapToObj(i -> transactionalProducer(
+                        "simulated-task-producer-" + CONNECTOR_NAME + "-" + i,
+                        Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i)
+                )).collect(Collectors.toList());
 
         producers.forEach(KafkaProducer::initTransactions);
 
@@ -1047,8 +1053,9 @@ public class ExactlyOnceSourceIntegrationTest {
         producers.forEach(producer -> assertTransactionalProducerIsFenced(producer, topic));
     }
 
-    private KafkaProducer<byte[], byte[]> transactionalProducer(String transactionalId) {
+    private KafkaProducer<byte[], byte[]> transactionalProducer(String clientId, String transactionalId) {
         Map<String, Object> transactionalProducerProps = new HashMap<>();
+        transactionalProducerProps.put(CLIENT_ID_CONFIG, clientId);
         transactionalProducerProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);
         transactionalProducerProps.put(TRANSACTIONAL_ID_CONFIG, transactionalId);
         return connect.kafka().createProducer(transactionalProducerProps);