You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/17 13:20:20 UTC

[kafka] branch trunk updated: KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8de7d37  KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
8de7d37 is described below

commit 8de7d3772443a63e46649ab651260db277efafec
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri May 17 14:20:04 2019 +0100

    KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
    
    Initiate `unreachable server` scenario before starting admin client to avoid timing issues if node is disconnected from the test thread while admin client network thread is processing a metadata request.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/admin/AdminClientUnitTestEnv.java    | 18 ++++++++++++------
 .../kafka/clients/admin/KafkaAdminClientTest.java      |  5 +++--
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 6023c63..42166b4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -53,14 +54,18 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
-        this(time, cluster, newStrMap(vals));
+        this(time, cluster, clientConfigs(vals));
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster) {
-        this(time, cluster, newStrMap());
+        this(time, cluster, clientConfigs());
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
+        this(time, cluster, config, Collections.emptyMap());
+    }
+
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config, Map<Node, Long> unreachableNodes) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
@@ -86,6 +91,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         });
 
         metadataManager.update(cluster, time.milliseconds());
+        unreachableNodes.forEach(mockClient::setUnreachable);
         this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, metadataManager, mockClient, time);
     }
 
@@ -110,15 +116,15 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         this.adminClient.close();
     }
 
-    private static Map<String, Object> newStrMap(String... vals) {
+    static Map<String, Object> clientConfigs(String... overrides) {
         Map<String, Object> map = new HashMap<>();
         map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
         map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
-        if (vals.length % 2 != 0) {
+        if (overrides.length % 2 != 0) {
             throw new IllegalStateException();
         }
-        for (int i = 0; i < vals.length; i += 2) {
-            map.put(vals[i], vals[i + 1]);
+        for (int i = 0; i < overrides.length; i += 2) {
+            map.put(overrides[i], overrides[i + 1]);
         }
         return map;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 40ba8b1..6aaa75b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -286,10 +286,11 @@ public class KafkaAdminClientTest {
         // which prevents AdminClient from being able to send the initial metadata request
 
         Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
+        Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L);
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+                AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
             Cluster discoveredCluster = mockCluster(0);
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
             env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
                     MetadataResponse.prepareResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));