You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/17 13:20:20 UTC
[kafka] branch trunk updated: KAFKA-8379;
Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8de7d37 KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
8de7d37 is described below
commit 8de7d3772443a63e46649ab651260db277efafec
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri May 17 14:20:04 2019 +0100
KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
Initiate `unreachable server` scenario before starting admin client to avoid timing issues if node is disconnected from the test thread while admin client network thread is processing a metadata request.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/clients/admin/AdminClientUnitTestEnv.java | 18 ++++++++++++------
.../kafka/clients/admin/KafkaAdminClientTest.java | 5 +++--
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 6023c63..42166b4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,14 +54,18 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
}
public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
- this(time, cluster, newStrMap(vals));
+ this(time, cluster, clientConfigs(vals));
}
public AdminClientUnitTestEnv(Time time, Cluster cluster) {
- this(time, cluster, newStrMap());
+ this(time, cluster, clientConfigs());
}
public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
+ this(time, cluster, config, Collections.emptyMap());
+ }
+
+ public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config, Map<Node, Long> unreachableNodes) {
this.time = time;
this.cluster = cluster;
AdminClientConfig adminClientConfig = new AdminClientConfig(config);
@@ -86,6 +91,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
});
metadataManager.update(cluster, time.milliseconds());
+ unreachableNodes.forEach(mockClient::setUnreachable);
this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, metadataManager, mockClient, time);
}
@@ -110,15 +116,15 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
this.adminClient.close();
}
- private static Map<String, Object> newStrMap(String... vals) {
+ static Map<String, Object> clientConfigs(String... overrides) {
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
- if (vals.length % 2 != 0) {
+ if (overrides.length % 2 != 0) {
throw new IllegalStateException();
}
- for (int i = 0; i < vals.length; i += 2) {
- map.put(vals[i], vals[i + 1]);
+ for (int i = 0; i < overrides.length; i += 2) {
+ map.put(overrides[i], overrides[i + 1]);
}
return map;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 40ba8b1..6aaa75b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -286,10 +286,11 @@ public class KafkaAdminClientTest {
// which prevents AdminClient from being able to send the initial metadata request
Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
+ Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L);
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+ AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
Cluster discoveredCluster = mockCluster(0);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
MetadataResponse.prepareResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));