You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/22 17:12:34 UTC
[kafka] branch trunk updated: MINOR: AdminClient metadata manager
should reset state on failure
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c1b30a1 MINOR: AdminClient metadata manager should reset state on failure
c1b30a1 is described below
commit c1b30a12b1a01ca0d1e39783884e8180f79ed8b1
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 22 10:11:48 2018 -0700
MINOR: AdminClient metadata manager should reset state on failure
If the internal metadata request fails, we must reset the state inside `AdminClientMetadataManager` or we will be stuck indefinitely in the `UPDATE_PENDING` state and have no way to fetch new metadata.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Dong Lin <li...@gmail.com>
Closes #5057 from hachikuji/fix-admin-client-metadata-update-failure
---
.../kafka/clients/admin/KafkaAdminClient.java | 20 ++--
.../admin/internals/AdminMetadataManager.java | 58 ++++++-----
.../java/org/apache/kafka/clients/MockClient.java | 14 +--
.../kafka/clients/admin/KafkaAdminClientTest.java | 100 +++++++++++++------
.../admin/internals/AdminMetadataManagerTest.java | 107 +++++++++++++++++++++
5 files changed, 219 insertions(+), 80 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6235918..9ae8bcd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -325,7 +325,7 @@ public class KafkaAdminClient extends AdminClient {
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
- AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, time,
+ AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
@@ -373,7 +373,7 @@ public class KafkaAdminClient extends AdminClient {
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
LogContext logContext = createLogContext(clientId);
- AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, time,
+ AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
@@ -398,7 +398,7 @@ public class KafkaAdminClient extends AdminClient {
this.metadataManager = metadataManager;
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
- metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds(), null);
+ metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
this.metrics = metrics;
this.client = client;
this.runnable = new AdminClientRunnable();
@@ -844,8 +844,7 @@ public class KafkaAdminClient extends AdminClient {
*/
private long sendEligibleCalls(long now) {
long pollTimeout = Long.MAX_VALUE;
- for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
- iter.hasNext(); ) {
+ for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Node, List<Call>> entry = iter.next();
List<Call> calls = entry.getValue();
if (calls.isEmpty()) {
@@ -1140,19 +1139,12 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
- metadataManager.update(response.cluster(), time.milliseconds(), null);
+ metadataManager.update(response.cluster(), time.milliseconds());
}
@Override
public void handleFailure(Throwable e) {
- if (e instanceof AuthenticationException) {
- log.info("Unable to fetch cluster metadata from node {} because of " +
- "authentication error", curNode(), e);
- metadataManager.update(Cluster.empty(), time.milliseconds(), (AuthenticationException) e);
- } else {
- log.info("Unable to fetch cluster metadata from node {}",
- curNode(), e);
- }
+ metadataManager.updateFailed(e);
}
};
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 3806560..e06aed2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.util.Collections;
@@ -41,11 +40,6 @@ public class AdminMetadataManager {
private Logger log;
/**
- * The timer.
- */
- private final Time time;
-
- /**
* The minimum amount of time that we should wait between subsequent
* retries, when fetching metadata.
*/
@@ -112,8 +106,7 @@ public class AdminMetadataManager {
@Override
public void handleAuthenticationFailure(AuthenticationException e) {
- log.info("AdminMetadataManager got AuthenticationException", e);
- update(Cluster.empty(), time.milliseconds(), e);
+ updateFailed(e);
}
@Override
@@ -133,13 +126,11 @@ public class AdminMetadataManager {
enum State {
QUIESCENT,
UPDATE_REQUESTED,
- UPDATE_PENDING;
+ UPDATE_PENDING
}
- public AdminMetadataManager(LogContext logContext, Time time, long refreshBackoffMs,
- long metadataExpireMs) {
+ public AdminMetadataManager(LogContext logContext, long refreshBackoffMs, long metadataExpireMs) {
this.log = logContext.logger(AdminMetadataManager.class);
- this.time = time;
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.updater = new AdminMetadataUpdater();
@@ -204,20 +195,26 @@ public class AdminMetadataManager {
// Calculate the time remaining until the next periodic update.
// We want to avoid making many metadata requests in a short amount of time,
// so there is a metadata refresh backoff period.
- long timeSinceUpdate = now - lastMetadataUpdateMs;
- long timeRemainingUntilUpdate = metadataExpireMs - timeSinceUpdate;
- long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
- long timeRemainingUntilAttempt = refreshBackoffMs - timeSinceAttempt;
- return Math.max(Math.max(0L, timeRemainingUntilUpdate), timeRemainingUntilAttempt);
+ return Math.max(delayBeforeNextAttemptMs(now), delayBeforeNextExpireMs(now));
case UPDATE_REQUESTED:
- // An update has been explicitly requested. Do it as soon as possible.
- return 0;
+ // Respect the backoff, even if an update has been requested
+ return delayBeforeNextAttemptMs(now);
default:
// An update is already pending, so we don't need to initiate another one.
return Long.MAX_VALUE;
}
}
+ private long delayBeforeNextExpireMs(long now) {
+ long timeSinceUpdate = now - lastMetadataUpdateMs;
+ return Math.max(0, metadataExpireMs - timeSinceUpdate);
+ }
+
+ private long delayBeforeNextAttemptMs(long now) {
+ long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
+ return Math.max(0, refreshBackoffMs - timeSinceAttempt);
+ }
+
/**
* Transition into the UPDATE_PENDING state. Updates lastMetadataFetchAttemptMs.
*/
@@ -226,20 +223,33 @@ public class AdminMetadataManager {
this.lastMetadataFetchAttemptMs = now;
}
+ public void updateFailed(Throwable exception) {
+ // We depend on pending calls to request another metadata update
+ this.state = State.QUIESCENT;
+
+ if (exception instanceof AuthenticationException) {
+ log.warn("Metadata update failed due to authentication error", exception);
+ this.authException = (AuthenticationException) exception;
+ } else {
+ log.info("Metadata update failed", exception);
+ }
+ }
+
/**
* Receive new metadata, and transition into the QUIESCENT state.
* Updates lastMetadataUpdateMs, cluster, and authException.
*/
- public void update(Cluster cluster, long now, AuthenticationException authException) {
+ public void update(Cluster cluster, long now) {
if (cluster.isBootstrapConfigured()) {
log.debug("Setting bootstrap cluster metadata {}.", cluster);
} else {
- log.debug("Received cluster metadata {}{}.",
- cluster, authException == null ? "" : " with authentication exception.");
+ log.debug("Updating cluster metadata to {}", cluster);
+ this.lastMetadataUpdateMs = now;
}
+
this.state = State.QUIESCENT;
- this.lastMetadataUpdateMs = now;
- this.authException = authException;
+ this.authException = null;
+
if (!cluster.nodes().isEmpty()) {
this.cluster = cluster;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 7a8ba1c..7da4a30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -29,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -87,11 +85,11 @@ public class MockClient implements KafkaClient {
private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
// Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>();
- private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
- private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
+ private final Queue<FutureResponse> futureResponses = new ConcurrentLinkedDeque<>();
+ private final Queue<MetadataUpdate> metadataUpdates = new ConcurrentLinkedDeque<>();
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
private volatile int numBlockingWakeups = 0;
- private final AtomicInteger totalRequestCount = new AtomicInteger(0);
+
public MockClient(Time time) {
this(time, null);
}
@@ -422,7 +420,6 @@ public class MockClient implements KafkaClient {
futureResponses.clear();
metadataUpdates.clear();
authenticationErrors.clear();
- totalRequestCount.set(0);
}
public boolean hasPendingMetadataUpdates() {
@@ -490,7 +487,6 @@ public class MockClient implements KafkaClient {
@Override
public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse, RequestCompletionHandler callback) {
- totalRequestCount.incrementAndGet();
return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
expectResponse, callback);
}
@@ -534,8 +530,4 @@ public class MockClient implements KafkaClient {
}
}
- // visible for testing
- public int totalRequestCount() {
- return totalRequestCount.get();
- }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cdd9a28..92cda40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -78,9 +78,9 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -93,6 +93,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
import static org.apache.kafka.common.requests.ResourceType.BROKER;
import static org.apache.kafka.common.requests.ResourceType.TOPIC;
import static org.junit.Assert.assertEquals;
@@ -223,7 +224,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(new Node(0, "localhost", 8121));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(1000)).all();
assertFutureError(future, TimeoutException.class);
}
@@ -247,7 +248,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(env.cluster().nodeById(0));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(1000)).all();
assertFutureError(future, SaslAuthenticationException.class);
}
@@ -261,7 +262,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(10000)).all();
future.get();
}
@@ -285,8 +286,8 @@ public class KafkaAdminClientTest {
Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))),
env.cluster().nodeById(1));
KafkaFuture<Void> future = env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
- new CreateTopicsOptions().timeoutMs(10000)).all();
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
+ new CreateTopicsOptions().timeoutMs(10000)).all();
future.get();
}
}
@@ -298,32 +299,69 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().setNode(env.cluster().controller());
- List<String> sillyTopicNames = Arrays.asList(new String[] {"", null});
- Map<String, KafkaFuture<Void>> deleteFutures =
- env.adminClient().deleteTopics(sillyTopicNames).values();
+ List<String> sillyTopicNames = asList("", null);
+ Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
- assertEquals(0, env.kafkaClient().totalRequestCount());
+ assertEquals(0, env.kafkaClient().inFlightRequestCount());
Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
- assertEquals(0, env.kafkaClient().totalRequestCount());
+ assertEquals(0, env.kafkaClient().inFlightRequestCount());
List<NewTopic> newTopics = new ArrayList<>();
for (String sillyTopicName : sillyTopicNames) {
newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
}
- Map<String, KafkaFuture<Void>> createFutures =
- env.adminClient().createTopics(newTopics).values();
+ Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
- assertEquals(0, env.kafkaClient().totalRequestCount());
+ assertEquals(0, env.kafkaClient().inFlightRequestCount());
+ }
+ }
+
+ @Test
+ public void testMetadataRetries() throws Exception {
+ // We should continue retrying on metadata update failures in spite of retry configuration
+
+ String topic = "topic";
+ MockClient mockClient = new MockClient(Time.SYSTEM);
+ Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
+ Cluster initializedCluster = mockCluster(0);
+ mockClient.setNode(bootstrapCluster.nodes().get(0));
+
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, bootstrapCluster,
+ newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
+ AdminClientConfig.RETRIES_CONFIG, "0"))) {
+
+ // The first request fails with a disconnect
+ env.kafkaClient().prepareResponse(null, true);
+
+ // The next one succeeds and gives us the controller id
+ env.kafkaClient().prepareResponse(new MetadataResponse(initializedCluster.nodes(),
+ initializedCluster.clusterResource().clusterId(),
+ initializedCluster.controller().id(),
+ Collections.<MetadataResponse.TopicMetadata>emptyList()));
+
+ // Then we respond to the DescribeTopic request
+ Node leader = initializedCluster.nodes().get(0);
+ MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
+ Errors.NONE, 0, leader, singletonList(leader), singletonList(leader), singletonList(leader));
+ env.kafkaClient().prepareResponse(new MetadataResponse(initializedCluster.nodes(),
+ initializedCluster.clusterResource().clusterId(), 1,
+ singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
+ singletonList(partitionMetadata)))));
+
+ DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
+ Map<String, TopicDescription> topicDescriptions = result.all().get();
+ assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
}
}
@@ -347,7 +385,7 @@ public class KafkaAdminClientTest {
private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
try {
env.adminClient().createTopics(
- Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(10000)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
@@ -645,15 +683,15 @@ public class KafkaAdminClientTest {
List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, nodes.get(0),
- Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+ singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, nodes.get(0),
- Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+ singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
- Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+ singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, nodes.get(0),
- Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+ singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, nodes.get(0),
- Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+ singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
@@ -738,24 +776,24 @@ public class KafkaAdminClientTest {
List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node0,
- Collections.singletonList(node0), Collections.singletonList(node0), Collections.<Node>emptyList()));
+ singletonList(node0), singletonList(node0), Collections.<Node>emptyList()));
partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node1,
- Collections.singletonList(node1), Collections.singletonList(node1), Collections.<Node>emptyList()));
+ singletonList(node1), singletonList(node1), Collections.<Node>emptyList()));
partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node2,
- Collections.singletonList(node2), Collections.singletonList(node2), Collections.<Node>emptyList()));
+ singletonList(node2), singletonList(node2), Collections.<Node>emptyList()));
env.kafkaClient().prepareResponse(
new MetadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
- Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
+ singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.NONE,
- Arrays.asList(
+ asList(
new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
new ListGroupsResponse.Group("group-connect-1", "connector")
)),
@@ -771,7 +809,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.NONE,
- Arrays.asList(
+ asList(
new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
new ListGroupsResponse.Group("group-connect-2", "connector")
)),
@@ -792,7 +830,7 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
- Collections.singletonList(new MetadataResponse.TopicMetadata(
+ singletonList(new MetadataResponse.TopicMetadata(
Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME,
true, Collections.<MetadataResponse.PartitionMetadata>emptyList()))));
final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups();
@@ -843,7 +881,7 @@ public class KafkaAdminClientTest {
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
- Arrays.asList(
+ asList(
new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
groupMetadataMap.put(
@@ -853,13 +891,13 @@ public class KafkaAdminClientTest {
"",
"connect",
"",
- Arrays.asList(
+ asList(
new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
- final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
+ final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
assertEquals(1, result.describedGroups().size());
@@ -921,7 +959,7 @@ public class KafkaAdminClientTest {
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
- final List<String> groupIds = Collections.singletonList("group-0");
+ final List<String> groupIds = singletonList("group-0");
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
new file mode 100644
index 0000000..7254123
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AdminMetadataManagerTest {
+ private final MockTime time = new MockTime();
+ private final LogContext logContext = new LogContext();
+ private final long refreshBackoffMs = 100;
+ private final long metadataExpireMs = 60000;
+ private final AdminMetadataManager mgr = new AdminMetadataManager(
+ logContext, refreshBackoffMs, metadataExpireMs);
+
+ @Test
+ public void testMetadataReady() {
+ // Metadata is not ready on initialization
+ assertFalse(mgr.isReady());
+ assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+ // Metadata is not ready when bootstrap servers are set
+ mgr.update(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))),
+ time.milliseconds());
+ assertFalse(mgr.isReady());
+ assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+ mgr.update(mockCluster(), time.milliseconds());
+ assertTrue(mgr.isReady());
+ assertEquals(metadataExpireMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+ time.sleep(metadataExpireMs);
+ assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+ }
+
+ @Test
+ public void testMetadataRefreshBackoff() {
+ mgr.transitionToUpdatePending(time.milliseconds());
+ assertEquals(Long.MAX_VALUE, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+ mgr.updateFailed(new RuntimeException());
+ assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+ // Even if we explicitly request an update, the backoff should be respected
+ mgr.requestUpdate();
+ assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+ time.sleep(refreshBackoffMs);
+ assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+ }
+
+ @Test
+ public void testAuthenticationFailure() {
+ mgr.transitionToUpdatePending(time.milliseconds());
+ mgr.updateFailed(new AuthenticationException("Authentication failed"));
+ assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+ try {
+ mgr.isReady();
+ fail("Expected AuthenticationException to be thrown");
+ } catch (AuthenticationException e) {
+ // Expected
+ }
+
+ mgr.update(mockCluster(), time.milliseconds());
+ assertTrue(mgr.isReady());
+ }
+
+ private static Cluster mockCluster() {
+ HashMap<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+ nodes.put(1, new Node(1, "localhost", 8122));
+ nodes.put(2, new Node(2, "localhost", 8123));
+ return new Cluster("mockClusterId", nodes.values(),
+ Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
lindong@apache.org.