You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/22 17:12:34 UTC

[kafka] branch trunk updated: MINOR: AdminClient metadata manager should reset state on failure

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1b30a1  MINOR: AdminClient metadata manager should reset state on failure
c1b30a1 is described below

commit c1b30a12b1a01ca0d1e39783884e8180f79ed8b1
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 22 10:11:48 2018 -0700

    MINOR: AdminClient metadata manager should reset state on failure
    
    If the internal metadata request fails, we must reset the state inside `AdminClientMetadataManager` or we will be stuck indefinitely in the `UPDATE_PENDING` state and have no way to fetch new metadata.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5057 from hachikuji/fix-admin-client-metadata-update-failure
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  20 ++--
 .../admin/internals/AdminMetadataManager.java      |  58 ++++++-----
 .../java/org/apache/kafka/clients/MockClient.java  |  14 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 100 +++++++++++++------
 .../admin/internals/AdminMetadataManagerTest.java  | 107 +++++++++++++++++++++
 5 files changed, 219 insertions(+), 80 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6235918..9ae8bcd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -325,7 +325,7 @@ public class KafkaAdminClient extends AdminClient {
         try {
             // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
             // simplifies communication with older brokers)
-            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, time,
+            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
@@ -373,7 +373,7 @@ public class KafkaAdminClient extends AdminClient {
         try {
             metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
             LogContext logContext = createLogContext(clientId);
-            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, time,
+            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
@@ -398,7 +398,7 @@ public class KafkaAdminClient extends AdminClient {
         this.metadataManager = metadataManager;
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
             config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds(), null);
+        metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
@@ -844,8 +844,7 @@ public class KafkaAdminClient extends AdminClient {
          */
         private long sendEligibleCalls(long now) {
             long pollTimeout = Long.MAX_VALUE;
-            for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
-                     iter.hasNext(); ) {
+            for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
                 Map.Entry<Node, List<Call>> entry = iter.next();
                 List<Call> calls = entry.getValue();
                 if (calls.isEmpty()) {
@@ -1140,19 +1139,12 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     MetadataResponse response = (MetadataResponse) abstractResponse;
-                    metadataManager.update(response.cluster(), time.milliseconds(), null);
+                    metadataManager.update(response.cluster(), time.milliseconds());
                 }
 
                 @Override
                 public void handleFailure(Throwable e) {
-                    if (e instanceof AuthenticationException) {
-                        log.info("Unable to fetch cluster metadata from node {} because of " +
-                            "authentication error", curNode(), e);
-                        metadataManager.update(Cluster.empty(), time.milliseconds(), (AuthenticationException) e);
-                    } else {
-                        log.info("Unable to fetch cluster metadata from node {}",
-                            curNode(), e);
-                    }
+                    metadataManager.updateFailed(e);
                 }
             };
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 3806560..e06aed2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
 import java.util.Collections;
@@ -41,11 +40,6 @@ public class AdminMetadataManager {
     private Logger log;
 
     /**
-     * The timer.
-     */
-    private final Time time;
-
-    /**
      * The minimum amount of time that we should wait between subsequent
      * retries, when fetching metadata.
      */
@@ -112,8 +106,7 @@ public class AdminMetadataManager {
 
         @Override
         public void handleAuthenticationFailure(AuthenticationException e) {
-            log.info("AdminMetadataManager got AuthenticationException", e);
-            update(Cluster.empty(), time.milliseconds(), e);
+            updateFailed(e);
         }
 
         @Override
@@ -133,13 +126,11 @@ public class AdminMetadataManager {
     enum State {
         QUIESCENT,
         UPDATE_REQUESTED,
-        UPDATE_PENDING;
+        UPDATE_PENDING
     }
 
-    public AdminMetadataManager(LogContext logContext, Time time, long refreshBackoffMs,
-                                long metadataExpireMs) {
+    public AdminMetadataManager(LogContext logContext, long refreshBackoffMs, long metadataExpireMs) {
         this.log = logContext.logger(AdminMetadataManager.class);
-        this.time = time;
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
         this.updater = new AdminMetadataUpdater();
@@ -204,20 +195,26 @@ public class AdminMetadataManager {
                 // Calculate the time remaining until the next periodic update.
                 // We want to avoid making many metadata requests in a short amount of time,
                 // so there is a metadata refresh backoff period.
-                long timeSinceUpdate = now - lastMetadataUpdateMs;
-                long timeRemainingUntilUpdate = metadataExpireMs - timeSinceUpdate;
-                long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
-                long timeRemainingUntilAttempt = refreshBackoffMs - timeSinceAttempt;
-                return Math.max(Math.max(0L, timeRemainingUntilUpdate), timeRemainingUntilAttempt);
+                return Math.max(delayBeforeNextAttemptMs(now), delayBeforeNextExpireMs(now));
             case UPDATE_REQUESTED:
-                // An update has been explicitly requested.  Do it as soon as possible.
-                return 0;
+                // Respect the backoff, even if an update has been requested
+                return delayBeforeNextAttemptMs(now);
             default:
                 // An update is already pending, so we don't need to initiate another one.
                 return Long.MAX_VALUE;
         }
     }
 
+    private long delayBeforeNextExpireMs(long now) {
+        long timeSinceUpdate = now - lastMetadataUpdateMs;
+        return Math.max(0, metadataExpireMs - timeSinceUpdate);
+    }
+
+    private long delayBeforeNextAttemptMs(long now) {
+        long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
+        return Math.max(0, refreshBackoffMs - timeSinceAttempt);
+    }
+
     /**
      * Transition into the UPDATE_PENDING state.  Updates lastMetadataFetchAttemptMs.
      */
@@ -226,20 +223,33 @@ public class AdminMetadataManager {
         this.lastMetadataFetchAttemptMs = now;
     }
 
+    public void updateFailed(Throwable exception) {
+        // We depend on pending calls to request another metadata update
+        this.state = State.QUIESCENT;
+
+        if (exception instanceof AuthenticationException) {
+            log.warn("Metadata update failed due to authentication error", exception);
+            this.authException = (AuthenticationException) exception;
+        } else {
+            log.info("Metadata update failed", exception);
+        }
+    }
+
     /**
      * Receive new metadata, and transition into the QUIESCENT state.
      * Updates lastMetadataUpdateMs, cluster, and authException.
      */
-    public void update(Cluster cluster, long now, AuthenticationException authException) {
+    public void update(Cluster cluster, long now) {
         if (cluster.isBootstrapConfigured()) {
             log.debug("Setting bootstrap cluster metadata {}.", cluster);
         } else {
-            log.debug("Received cluster metadata {}{}.",
-                cluster, authException == null ? "" : " with authentication exception.");
+            log.debug("Updating cluster metadata to {}", cluster);
+            this.lastMetadataUpdateMs = now;
         }
+
         this.state = State.QUIESCENT;
-        this.lastMetadataUpdateMs = now;
-        this.authException = authException;
+        this.authException = null;
+
         if (!cluster.nodes().isEmpty()) {
             this.cluster = cluster;
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 7a8ba1c..7da4a30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -29,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -87,11 +85,11 @@ public class MockClient implements KafkaClient {
     private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
     // Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
     private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>();
-    private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
-    private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
+    private final Queue<FutureResponse> futureResponses = new ConcurrentLinkedDeque<>();
+    private final Queue<MetadataUpdate> metadataUpdates = new ConcurrentLinkedDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
-    private final AtomicInteger totalRequestCount = new AtomicInteger(0);
+
     public MockClient(Time time) {
         this(time, null);
     }
@@ -422,7 +420,6 @@ public class MockClient implements KafkaClient {
         futureResponses.clear();
         metadataUpdates.clear();
         authenticationErrors.clear();
-        totalRequestCount.set(0);
     }
 
     public boolean hasPendingMetadataUpdates() {
@@ -490,7 +487,6 @@ public class MockClient implements KafkaClient {
     @Override
     public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse, RequestCompletionHandler callback) {
-        totalRequestCount.incrementAndGet();
         return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
                 expectResponse, callback);
     }
@@ -534,8 +530,4 @@ public class MockClient implements KafkaClient {
         }
     }
 
-    // visible for testing
-    public int totalRequestCount() {
-        return totalRequestCount.get();
-    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cdd9a28..92cda40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -78,9 +78,9 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -93,6 +93,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.requests.ResourceType.BROKER;
 import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
@@ -223,7 +224,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(1000)).all();
             assertFutureError(future, TimeoutException.class);
         }
@@ -247,7 +248,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().nodeById(0));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                 new CreateTopicsOptions().timeoutMs(1000)).all();
             assertFutureError(future, SaslAuthenticationException.class);
         }
@@ -261,7 +262,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(10000)).all();
             future.get();
         }
@@ -285,8 +286,8 @@ public class KafkaAdminClientTest {
                     Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))),
                 env.cluster().nodeById(1));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
-                new CreateTopicsOptions().timeoutMs(10000)).all();
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all();
             future.get();
         }
     }
@@ -298,32 +299,69 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
-            List<String> sillyTopicNames = Arrays.asList(new String[] {"", null});
-            Map<String, KafkaFuture<Void>> deleteFutures =
-                env.adminClient().deleteTopics(sillyTopicNames).values();
+            List<String> sillyTopicNames = asList("", null);
+            Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
                 assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
             }
-            assertEquals(0, env.kafkaClient().totalRequestCount());
+            assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
             Map<String, KafkaFuture<TopicDescription>> describeFutures =
                     env.adminClient().describeTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
                 assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
             }
-            assertEquals(0, env.kafkaClient().totalRequestCount());
+            assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
             List<NewTopic> newTopics = new ArrayList<>();
             for (String sillyTopicName : sillyTopicNames) {
                 newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
             }
 
-            Map<String, KafkaFuture<Void>> createFutures =
-                env.adminClient().createTopics(newTopics).values();
+            Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
             for (String sillyTopicName : sillyTopicNames) {
                 assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
             }
-            assertEquals(0, env.kafkaClient().totalRequestCount());
+            assertEquals(0, env.kafkaClient().inFlightRequestCount());
+        }
+    }
+
+    @Test
+    public void testMetadataRetries() throws Exception {
+        // We should continue retrying on metadata update failures in spite of retry configuration
+
+        String topic = "topic";
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
+        Cluster initializedCluster = mockCluster(0);
+        mockClient.setNode(bootstrapCluster.nodes().get(0));
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, bootstrapCluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
+                        AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
+                        AdminClientConfig.RETRIES_CONFIG, "0"))) {
+
+            // The first request fails with a disconnect
+            env.kafkaClient().prepareResponse(null, true);
+
+            // The next one succeeds and gives us the controller id
+            env.kafkaClient().prepareResponse(new MetadataResponse(initializedCluster.nodes(),
+                    initializedCluster.clusterResource().clusterId(),
+                    initializedCluster.controller().id(),
+                    Collections.<MetadataResponse.TopicMetadata>emptyList()));
+
+            // Then we respond to the DescribeTopic request
+            Node leader = initializedCluster.nodes().get(0);
+            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
+                    Errors.NONE, 0, leader, singletonList(leader), singletonList(leader), singletonList(leader));
+            env.kafkaClient().prepareResponse(new MetadataResponse(initializedCluster.nodes(),
+                    initializedCluster.clusterResource().clusterId(), 1,
+                    singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
+                            singletonList(partitionMetadata)))));
+
+            DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
+            Map<String, TopicDescription> topicDescriptions = result.all().get();
+            assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
         }
     }
 
@@ -347,7 +385,7 @@ public class KafkaAdminClientTest {
     private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
         try {
             env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(10000)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
@@ -645,15 +683,15 @@ public class KafkaAdminClientTest {
             List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
             List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
             p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, nodes.get(0),
-                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
             p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, nodes.get(0),
-                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
             p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
-                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
             p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, nodes.get(0),
-                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
             p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, nodes.get(0),
-                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+                    singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.<Node>emptyList()));
 
             t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
 
@@ -738,24 +776,24 @@ public class KafkaAdminClientTest {
 
             List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
             partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node0,
-                    Collections.singletonList(node0), Collections.singletonList(node0), Collections.<Node>emptyList()));
+                    singletonList(node0), singletonList(node0), Collections.<Node>emptyList()));
             partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node1,
-                    Collections.singletonList(node1), Collections.singletonList(node1), Collections.<Node>emptyList()));
+                    singletonList(node1), singletonList(node1), Collections.<Node>emptyList()));
             partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node2,
-                    Collections.singletonList(node2), Collections.singletonList(node2), Collections.<Node>emptyList()));
+                    singletonList(node2), singletonList(node2), Collections.<Node>emptyList()));
 
             env.kafkaClient().prepareResponse(
                     new MetadataResponse(
                             env.cluster().nodes(),
                             env.cluster().clusterResource().clusterId(),
                             env.cluster().controller().id(),
-                            Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
+                            singletonList(new MetadataResponse.TopicMetadata(Errors.NONE,
                                 Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata))));
 
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.NONE,
-                            Arrays.asList(
+                            asList(
                                     new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
                                     new ListGroupsResponse.Group("group-connect-1", "connector")
                             )),
@@ -771,7 +809,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.NONE,
-                            Arrays.asList(
+                            asList(
                                     new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
                                     new ListGroupsResponse.Group("group-connect-2", "connector")
                             )),
@@ -792,7 +830,7 @@ public class KafkaAdminClientTest {
                     env.cluster().nodes(),
                     env.cluster().clusterResource().clusterId(),
                     env.cluster().controller().id(),
-                    Collections.singletonList(new MetadataResponse.TopicMetadata(
+                    singletonList(new MetadataResponse.TopicMetadata(
                         Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME,
                         true, Collections.<MetadataResponse.PartitionMetadata>emptyList()))));
             final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups();
@@ -843,7 +881,7 @@ public class KafkaAdminClientTest {
                     "",
                     ConsumerProtocol.PROTOCOL_TYPE,
                     "",
-                    Arrays.asList(
+                    asList(
                         new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
                         new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
             groupMetadataMap.put(
@@ -853,13 +891,13 @@ public class KafkaAdminClientTest {
                     "",
                     "connect",
                     "",
-                    Arrays.asList(
+                    asList(
                         new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
                         new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
 
             env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
 
-            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
             final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
 
             assertEquals(1, result.describedGroups().size());
@@ -921,7 +959,7 @@ public class KafkaAdminClientTest {
                 Collections.<String>emptySet(),
                 Collections.<String>emptySet(), nodes.get(0));
 
-        final List<String> groupIds = Collections.singletonList("group-0");
+        final List<String> groupIds = singletonList("group-0");
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
new file mode 100644
index 0000000..7254123
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AdminMetadataManagerTest {
+    private final MockTime time = new MockTime();
+    private final LogContext logContext = new LogContext();
+    private final long refreshBackoffMs = 100;
+    private final long metadataExpireMs = 60000;
+    private final AdminMetadataManager mgr = new AdminMetadataManager(
+            logContext, refreshBackoffMs, metadataExpireMs);
+
+    @Test
+    public void testMetadataReady() {
+        // Metadata is not ready on initialization
+        assertFalse(mgr.isReady());
+        assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+        // Metadata is not ready when bootstrap servers are set
+        mgr.update(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))),
+                time.milliseconds());
+        assertFalse(mgr.isReady());
+        assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+        mgr.update(mockCluster(), time.milliseconds());
+        assertTrue(mgr.isReady());
+        assertEquals(metadataExpireMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+        time.sleep(metadataExpireMs);
+        assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+    }
+
+    @Test
+    public void testMetadataRefreshBackoff() {
+        mgr.transitionToUpdatePending(time.milliseconds());
+        assertEquals(Long.MAX_VALUE, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+        mgr.updateFailed(new RuntimeException());
+        assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+        // Even if we explicitly request an update, the backoff should be respected
+        mgr.requestUpdate();
+        assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+
+        time.sleep(refreshBackoffMs);
+        assertEquals(0, mgr.metadataFetchDelayMs(time.milliseconds()));
+    }
+
+    @Test
+    public void testAuthenticationFailure() {
+        mgr.transitionToUpdatePending(time.milliseconds());
+        mgr.updateFailed(new AuthenticationException("Authentication failed"));
+        assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
+        try {
+            mgr.isReady();
+            fail("Expected AuthenticationException to be thrown");
+        } catch (AuthenticationException e) {
+            // Expected
+        }
+
+        mgr.update(mockCluster(), time.milliseconds());
+        assertTrue(mgr.isReady());
+    }
+
+    private static Cluster mockCluster() {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+        nodes.put(1, new Node(1, "localhost", 8122));
+        nodes.put(2, new Node(2, "localhost", 8123));
+        return new Cluster("mockClusterId", nodes.values(),
+                Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.