You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/09 21:27:38 UTC

[kafka] branch trunk updated: KAFKA-6299; Fix AdminClient error handling when metadata changes (#4295)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new abbd53d  KAFKA-6299; Fix AdminClient error handling when metadata changes (#4295)
abbd53d is described below

commit abbd53da4ad98c3a95118c9770a9d247e54b0eef
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Wed May 9 14:27:28 2018 -0700

    KAFKA-6299; Fix AdminClient error handling when metadata changes (#4295)
    
    When AdminClient gets a NOT_CONTROLLER error, it should refresh its metadata and retry the request, rather than making the end-user deal with NotControllerException.
    
    Move AdminClient's metadata management outside of NetworkClient and into AdminMetadataManager. This will make it easier to do more sophisticated metadata management in the future, such as implementing a NodeProvider which fetches the leaders for topics.
    
    Rather than manipulating newCalls directly, the AdminClient service thread now drains it directly into pendingCalls. This minimizes the amount of locking we have to do, since pendingCalls is only accessed from the service thread.
---
 .../org/apache/kafka/clients/MetadataUpdater.java  |   2 +-
 .../kafka/clients/admin/KafkaAdminClient.java      | 372 +++++++++++++--------
 .../admin/internal/AdminMetadataManager.java       | 247 ++++++++++++++
 .../main/java/org/apache/kafka/common/Cluster.java |   3 +-
 .../java/org/apache/kafka/clients/MockClient.java  |  26 ++
 .../clients/admin/AdminClientUnitTestEnv.java      |  23 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 123 +++++--
 .../apache/kafka/connect/util/TopicAdminTest.java  |   3 +-
 8 files changed, 626 insertions(+), 173 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 1267283..09ed995 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -29,7 +29,7 @@ import java.util.List;
  * <p>
  * This class is not thread-safe!
  */
-interface MetadataUpdater {
+public interface MetadataUpdater {
 
     /**
      * Gets the current cluster info without blocking.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d8c0bad..70e9fbd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,11 +22,11 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internal.AdminMetadataManager;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -44,7 +44,6 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -186,9 +185,9 @@ public class KafkaAdminClient extends AdminClient {
     private final Time time;
 
     /**
-     * The cluster metadata used by the KafkaClient.
+     * The cluster metadata manager used by the KafkaClient.
      */
-    private final Metadata metadata;
+    private final AdminMetadataManager metadataManager;
 
     /**
      * The metrics for this KafkaAdminClient.
@@ -327,8 +326,9 @@ public class KafkaAdminClient extends AdminClient {
         try {
             // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
             // simplifies communication with older brokers)
-            Metadata metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
+            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, time,
+                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
             Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
@@ -344,7 +344,7 @@ public class KafkaAdminClient extends AdminClient {
                     metrics, time, metricGrpPrefix, channelBuilder, logContext);
             networkClient = new NetworkClient(
                 selector,
-                metadata,
+                metadataManager.updater(),
                 clientId,
                 1,
                 config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
@@ -356,7 +356,7 @@ public class KafkaAdminClient extends AdminClient {
                 true,
                 apiVersions,
                 logContext);
-            return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient,
+            return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
                 timeoutProcessorFactory, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
@@ -367,35 +367,39 @@ public class KafkaAdminClient extends AdminClient {
         }
     }
 
-    static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata, Time time) {
+    static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Time time) {
         Metrics metrics = null;
         String clientId = generateClientId(config);
 
         try {
             metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
-            return new KafkaAdminClient(config, clientId, time, metadata, metrics, client, null,
-                    createLogContext(clientId));
+            LogContext logContext = createLogContext(clientId);
+            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, time,
+                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
+                client, null, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
             throw new KafkaException("Failed create new KafkaAdminClient", exc);
         }
     }
 
-    private static LogContext createLogContext(String clientId) {
+    static LogContext createLogContext(String clientId) {
         return new LogContext("[AdminClient clientId=" + clientId + "] ");
     }
 
-    private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata,
-                     Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory,
-                     LogContext logContext) {
+    private KafkaAdminClient(AdminClientConfig config, String clientId, Time time,
+                AdminMetadataManager metadataManager, Metrics metrics, KafkaClient client,
+                TimeoutProcessorFactory timeoutProcessorFactory, LogContext logContext) {
         this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.clientId = clientId;
         this.log = logContext.logger(KafkaAdminClient.class);
         this.time = time;
-        this.metadata = metadata;
+        this.metadataManager = metadataManager;
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
             config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+        metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds(), null);
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
@@ -462,6 +466,13 @@ public class KafkaAdminClient extends AdminClient {
         Node provide();
     }
 
+    private class MetadataUpdateNodeIdProvider implements NodeProvider {
+        @Override
+        public Node provide() {
+            return client.leastLoadedNode(time.milliseconds());
+        }
+    }
+
     private class ConstantNodeIdProvider implements NodeProvider {
         private final int nodeId;
 
@@ -471,7 +482,16 @@ public class KafkaAdminClient extends AdminClient {
 
         @Override
         public Node provide() {
-            return metadata.fetch().nodeById(nodeId);
+            if (metadataManager.isReady() &&
+                    (metadataManager.nodeById(nodeId) != null)) {
+                return metadataManager.nodeById(nodeId);
+            }
+            // If we can't find the node with the given constant ID, we schedule a
+            // metadata update and hope it appears.  This behavior is useful for avoiding
+            // flaky behavior in tests when the cluster is starting up and not all nodes
+            // have appeared.
+            metadataManager.requestUpdate();
+            return null;
         }
     }
 
@@ -481,7 +501,12 @@ public class KafkaAdminClient extends AdminClient {
     private class ControllerNodeProvider implements NodeProvider {
         @Override
         public Node provide() {
-            return metadata.fetch().controller();
+            if (metadataManager.isReady() &&
+                    (metadataManager.controller() != null)) {
+                return metadataManager.controller();
+            }
+            metadataManager.requestUpdate();
+            return null;
         }
     }
 
@@ -491,23 +516,40 @@ public class KafkaAdminClient extends AdminClient {
     private class LeastLoadedNodeProvider implements NodeProvider {
         @Override
         public Node provide() {
-            return client.leastLoadedNode(time.milliseconds());
+            if (metadataManager.isReady()) {
+                // This may return null if all nodes are busy.
+                // In that case, we will postpone node assignment.
+                return client.leastLoadedNode(time.milliseconds());
+            }
+            metadataManager.requestUpdate();
+            return null;
         }
     }
 
     abstract class Call {
+        private final boolean internal;
         private final String callName;
         private final long deadlineMs;
         private final NodeProvider nodeProvider;
         private int tries = 0;
         private boolean aborted = false;
+        private Node curNode = null;
 
-        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+        Call(boolean internal, String callName, long deadlineMs, NodeProvider nodeProvider) {
+            this.internal = internal;
             this.callName = callName;
             this.deadlineMs = deadlineMs;
             this.nodeProvider = nodeProvider;
         }
 
+        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+            this(false, callName, deadlineMs, nodeProvider);
+        }
+
+        protected Node curNode() {
+            return curNode;
+        }
+
         /**
          * Handle a failure.
          *
@@ -615,6 +657,10 @@ public class KafkaAdminClient extends AdminClient {
         public String toString() {
             return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs + ")";
         }
+
+        public boolean isInternal() {
+            return internal;
+        }
     }
 
     static class TimeoutProcessorFactory {
@@ -698,43 +744,15 @@ public class KafkaAdminClient extends AdminClient {
         private List<Call> newCalls = new LinkedList<>();
 
         /**
-         * Check if the AdminClient metadata is ready.
-         * We need to know who the controller is, and have a non-empty view of the cluster.
-         *
-         * @param prevMetadataVersion       The previous metadata version which wasn't usable.
-         * @return                          null if the metadata is usable; the current metadata
-         *                                  version otherwise
-         */
-        private Integer checkMetadataReady(Integer prevMetadataVersion) {
-            if (prevMetadataVersion != null) {
-                if (prevMetadataVersion == metadata.version())
-                    return prevMetadataVersion;
-            }
-            Cluster cluster = metadata.fetch();
-            if (cluster.nodes().isEmpty()) {
-                log.trace("Metadata is not ready yet. No cluster nodes found.");
-                return metadata.requestUpdate();
-            }
-            if (cluster.controller() == null) {
-                log.trace("Metadata is not ready yet. No controller found.");
-                return metadata.requestUpdate();
-            }
-            if (prevMetadataVersion != null) {
-                log.trace("Metadata is now ready.");
-            }
-            return null;
-        }
-
-        /**
-         * Time out the elements in the newCalls list which are expired.
+         * Time out the elements in the pendingCalls list which are expired.
          *
          * @param processor     The timeout processor.
          */
-        private synchronized void timeoutNewCalls(TimeoutProcessor processor) {
-            int numTimedOut = processor.handleTimeouts(newCalls,
+        private void timeoutPendingCalls(TimeoutProcessor processor, List<Call> pendingCalls) {
+            int numTimedOut = processor.handleTimeouts(pendingCalls,
                     "Timed out waiting for a node assignment.");
             if (numTimedOut > 0)
-                log.debug("Timed out {} new calls.", numTimedOut);
+                log.debug("Timed out {} pending calls.", numTimedOut);
         }
 
         /**
@@ -743,7 +761,7 @@ public class KafkaAdminClient extends AdminClient {
          * @param processor     The timeout processor.
          * @param callsToSend   A map of nodes to the calls they need to handle.
          */
-        private void timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
+        private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
             int numTimedOut = 0;
             for (List<Call> callList : callsToSend.values()) {
                 numTimedOut += processor.handleTimeouts(callList,
@@ -751,48 +769,52 @@ public class KafkaAdminClient extends AdminClient {
             }
             if (numTimedOut > 0)
                 log.debug("Timed out {} call(s) with assigned nodes.", numTimedOut);
+            return numTimedOut;
         }
 
         /**
-         * Choose nodes for the calls in the callsToSend list.
+         * Drain all the calls from newCalls into pendingCalls.
          *
          * This function holds the lock for the minimum amount of time, to avoid blocking
          * users of AdminClient who will also take the lock to add new calls.
-         *
-         * @param now           The current time in milliseconds.
-         * @param callsToSend   A map of nodes to the calls they need to handle.
-         *
          */
-        private void chooseNodesForNewCalls(long now, Map<Node, List<Call>> callsToSend) {
-            List<Call> newCallsToAdd = null;
-            synchronized (this) {
-                if (newCalls.isEmpty()) {
-                    return;
-                }
-                newCallsToAdd = newCalls;
-                newCalls = new LinkedList<>();
-            }
-            for (Call call : newCallsToAdd) {
-                chooseNodeForNewCall(now, callsToSend, call);
+        private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) {
+            if (!newCalls.isEmpty()) {
+                pendingCalls.addAll(newCalls);
+                newCalls.clear();
             }
         }
 
         /**
-         * Choose a node for a new call.
+         * Choose nodes for the calls in the pendingCalls list.
          *
          * @param now           The current time in milliseconds.
+         * @param pendingIter   An iterator yielding pending calls.
          * @param callsToSend   A map of nodes to the calls they need to handle.
-         * @param call          The call.
+         *
          */
-        private void chooseNodeForNewCall(long now, Map<Node, List<Call>> callsToSend, Call call) {
-            Node node = call.nodeProvider.provide();
-            if (node == null) {
-                call.fail(now, new BrokerNotAvailableException(
-                    String.format("Error choosing node for %s: no node found.", call.callName)));
-                return;
+        private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter,
+                Map<Node, List<Call>> callsToSend) {
+            while (pendingIter.hasNext()) {
+                Call call = pendingIter.next();
+                Node node = null;
+                try {
+                    node = call.nodeProvider.provide();
+                } catch (Throwable t) {
+                    // Handle authentication errors while choosing nodes.
+                    log.debug("Unable to choose node for {}", call, t);
+                    pendingIter.remove();
+                    call.fail(now, t);
+                }
+                if (node != null) {
+                    log.trace("Assigned {} to node {}", call, node);
+                    pendingIter.remove();
+                    call.curNode = node;
+                    getOrCreateListValue(callsToSend, node).add(call);
+                } else {
+                    log.trace("Unable to assign {} to a node.", call);
+                }
             }
-            log.trace("Assigned {} to {}", call, node);
-            getOrCreateListValue(callsToSend, node).add(call);
         }
 
         /**
@@ -881,37 +903,6 @@ public class KafkaAdminClient extends AdminClient {
         }
 
         /**
-         * If an authentication exception is encountered with connection to any broker,
-         * fail all pending requests.
-         */
-        private void handleAuthenticationException(long now, Map<Node, List<Call>> callsToSend) {
-            AuthenticationException authenticationException = metadata.getAndClearAuthenticationException();
-            if (authenticationException == null) {
-                for (Node node : callsToSend.keySet()) {
-                    authenticationException = client.authenticationException(node);
-                    if (authenticationException != null)
-                        break;
-                }
-            }
-            if (authenticationException != null) {
-                synchronized (this) {
-                    failCalls(now, newCalls, authenticationException);
-                }
-                for (List<Call> calls : callsToSend.values()) {
-                    failCalls(now, calls, authenticationException);
-                }
-                callsToSend.clear();
-            }
-        }
-
-        private void failCalls(long now, List<Call> calls, AuthenticationException authenticationException) {
-            for (Call call : calls) {
-                call.fail(now, authenticationException);
-            }
-            calls.clear();
-        }
-
-        /**
          * Handle responses from the server.
          *
          * @param now                   The current time in milliseconds.
@@ -952,9 +943,14 @@ public class KafkaAdminClient extends AdminClient {
                 if (response.versionMismatch() != null) {
                     call.fail(now, response.versionMismatch());
                 } else if (response.wasDisconnected()) {
-                    call.fail(now, new DisconnectException(String.format(
-                        "Cancelled %s request with correlation id %s due to node %s being disconnected",
-                        call.callName, correlationId, response.destination())));
+                    AuthenticationException authException = client.authenticationException(call.curNode());
+                    if (authException != null) {
+                        call.fail(now, authException);
+                    } else {
+                        call.fail(now, new DisconnectException(String.format(
+                            "Cancelled %s request with correlation id %s due to node %s being disconnected",
+                            call.callName, correlationId, response.destination())));
+                    }
                 } else {
                     try {
                         call.handleResponse(response.responseBody());
@@ -970,13 +966,41 @@ public class KafkaAdminClient extends AdminClient {
             }
         }
 
-        private synchronized boolean threadShouldExit(long now, long curHardShutdownTimeMs,
+        private boolean hasActiveExternalCalls(Collection<Call> calls) {
+            for (Call call : calls) {
+                if (!call.isInternal()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        /**
+         * Return true if there are currently active external calls.
+         */
+        private boolean hasActiveExternalCalls(List<Call> pendingCalls,
                 Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
-            if (newCalls.isEmpty() && callsToSend.isEmpty() && correlationIdToCalls.isEmpty()) {
+            if (hasActiveExternalCalls(pendingCalls)) {
+                return true;
+            }
+            for (List<Call> callList : callsToSend.values()) {
+                if (hasActiveExternalCalls(callList)) {
+                    return true;
+                }
+            }
+            if (hasActiveExternalCalls(correlationIdToCalls.values())) {
+                return true;
+            }
+            return false;
+        }
+
+        private boolean threadShouldExit(long now, long curHardShutdownTimeMs, List<Call> pendingCalls,
+                Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
+            if (!hasActiveExternalCalls(pendingCalls, callsToSend, correlationIdToCalls)) {
                 log.trace("All work has been completed, and the I/O thread is now exiting.");
                 return true;
             }
-            if (now > curHardShutdownTimeMs) {
+            if (now >= curHardShutdownTimeMs) {
                 log.info("Forcing a hard I/O thread shutdown. Requests in progress will be aborted.");
                 return true;
             }
@@ -986,38 +1010,46 @@ public class KafkaAdminClient extends AdminClient {
 
         @Override
         public void run() {
-            /*
+            /**
+             * Calls which have not yet been assigned to a node.
+             * Only accessed from this thread.
+             */
+            ArrayList<Call> pendingCalls = new ArrayList<>();
+
+            /**
              * Maps nodes to calls that we want to send.
+             * Only accessed from this thread.
              */
             Map<Node, List<Call>> callsToSend = new HashMap<>();
 
-            /*
+            /**
              * Maps node ID strings to calls that have been sent.
+             * Only accessed from this thread.
              */
             Map<String, List<Call>> callsInFlight = new HashMap<>();
 
-            /*
+            /**
              * Maps correlation IDs to calls that have been sent.
+             * Only accessed from this thread.
              */
             Map<Integer, Call> correlationIdToCalls = new HashMap<>();
 
-            /*
-             * The previous metadata version which wasn't usable, or null if there is none.
-             */
-            Integer prevMetadataVersion = null;
-
             long now = time.milliseconds();
             log.trace("Thread starting");
             while (true) {
+                // Copy newCalls into pendingCalls.
+                drainNewCalls(pendingCalls);
+
                 // Check if the AdminClient thread should shut down.
                 long curHardShutdownTimeMs = hardShutdownTimeMs.get();
                 if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
-                        threadShouldExit(now, curHardShutdownTimeMs, callsToSend, correlationIdToCalls))
+                        threadShouldExit(now, curHardShutdownTimeMs, pendingCalls,
+                            callsToSend, correlationIdToCalls))
                     break;
 
                 // Handle timeouts.
                 TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);
-                timeoutNewCalls(timeoutProcessor);
+                timeoutPendingCalls(timeoutProcessor, pendingCalls);
                 timeoutCallsToSend(timeoutProcessor, callsToSend);
                 timeoutCallsInFlight(timeoutProcessor, callsInFlight);
 
@@ -1026,12 +1058,22 @@ public class KafkaAdminClient extends AdminClient {
                     pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
                 }
 
-                // Handle new calls and metadata update requests.
-                prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
-                if (prevMetadataVersion == null) {
-                    chooseNodesForNewCalls(now, callsToSend);
-                    pollTimeout = Math.min(pollTimeout,
-                        sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+                // Choose nodes for our pending calls.
+                chooseNodesForPendingCalls(now, pendingCalls.iterator(), callsToSend);
+                long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
+                if (metadataFetchDelayMs == 0) {
+                    metadataManager.transitionToUpdatePending(now);
+                    Call metadataCall = makeMetadataCall(now);
+                    // Create a new metadata fetch call and add it to the end of pendingCalls.
+                    // Assign a node for just the new call (we handled the other pending nodes above).
+                    pendingCalls.add(metadataCall);
+                    chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1),
+                        callsToSend);
+                }
+                pollTimeout = Math.min(pollTimeout,
+                    sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+                if (metadataFetchDelayMs > 0) {
+                    pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                 }
 
                 // Wait for network responses.
@@ -1041,7 +1083,6 @@ public class KafkaAdminClient extends AdminClient {
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
-                handleAuthenticationException(now, callsToSend);
                 handleResponses(now, responses, callsInFlight, correlationIdToCalls);
             }
             int numTimedOut = 0;
@@ -1051,10 +1092,13 @@ public class KafkaAdminClient extends AdminClient {
                         "The AdminClient thread has exited.");
                 newCalls = null;
             }
+            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
+                "The AdminClient thread has exited.");
+            numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend);
             numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
                     "The AdminClient thread has exited.");
             if (numTimedOut > 0) {
-                log.debug("Timed out {} remaining operations.", numTimedOut);
+                log.debug("Timed out {} remaining operation(s).", numTimedOut);
             }
             closeQuietly(client, "KafkaClient");
             closeQuietly(metrics, "Metrics");
@@ -1106,6 +1150,40 @@ public class KafkaAdminClient extends AdminClient {
                 enqueue(call, now);
             }
         }
+
+        /**
+         * Create a new metadata call.
+         */
+        private Call makeMetadataCall(long now) {
+            return new Call(true, "fetchMetadata", calcDeadlineMs(now, defaultTimeoutMs),
+                    new MetadataUpdateNodeIdProvider()) {
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    // Since this only requests node information, it's safe to pass true
+                    // for allowAutoTopicCreation (and it simplifies communication with
+                    // older brokers)
+                    return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    MetadataResponse response = (MetadataResponse) abstractResponse;
+                    metadataManager.update(response.cluster(), time.milliseconds(), null);
+                }
+
+                @Override
+                public void handleFailure(Throwable e) {
+                    if (e instanceof AuthenticationException) {
+                        log.info("Unable to fetch cluster metadata from node {} because of " +
+                            "authentication error", curNode(), e);
+                        metadataManager.update(Cluster.empty(), time.milliseconds(), (AuthenticationException) e);
+                    } else {
+                        log.info("Unable to fetch cluster metadata from node {}",
+                            curNode(), e);
+                    }
+                }
+            };
+        }
     }
 
     /**
@@ -1149,6 +1227,14 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
                 CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
+                // Check for controller change
+                for (ApiError error : response.errors().values()) {
+                    if (error.error() == Errors.NOT_CONTROLLER) {
+                        metadataManager.clearController();
+                        metadataManager.requestUpdate();
+                        throw error.exception();
+                    }
+                }
                 // Handle server responses for particular topics.
                 for (Map.Entry<String, ApiError> entry : response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
@@ -1212,6 +1298,14 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
+                // Check for controller change
+                for (Errors error : response.errors().values()) {
+                    if (error == Errors.NOT_CONTROLLER) {
+                        metadataManager.clearController();
+                        metadataManager.requestUpdate();
+                        throw error.exception();
+                    }
+                }
                 // Handle server responses for particular topics.
                 for (Map.Entry<String, Errors> entry : response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
@@ -1982,6 +2076,14 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
                 CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse;
+                // Check for controller change
+                for (ApiError error : response.errors().values()) {
+                    if (error.error() == Errors.NOT_CONTROLLER) {
+                        metadataManager.clearController();
+                        metadataManager.requestUpdate();
+                        throw error.exception();
+                    }
+                }
                 for (Map.Entry<String, ApiError> result : response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = futures.get(result.getKey());
                     if (result.getValue().isSuccess()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
new file mode 100644
index 0000000..63e7fc8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internal;
+
+import org.apache.kafka.clients.MetadataUpdater;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Manages the metadata for KafkaAdminClient.
+ *
+ * This class is not thread-safe.  It is only accessed from the AdminClient
+ * service thread (which also uses the NetworkClient).
+ */
+public class AdminMetadataManager {
+    private Logger log;
+
+    /**
+     * The timer.
+     */
+    private final Time time;
+
+    /**
+     * The minimum amount of time that we should wait between subsequent
+     * retries, when fetching metadata.
+     */
+    private final long refreshBackoffMs;
+
+    /**
+     * The minimum amount of time that we should wait before triggering an
+     * automatic metadata refresh.
+     */
+    private final long metadataExpireMs;
+
+    /**
+     * Used to update the NetworkClient metadata.
+     */
+    private final AdminMetadataUpdater updater;
+
+    /**
+     * The current metadata state.
+     */
+    private State state = State.QUIESCENT;
+
+    /**
+     * The time in wall-clock milliseconds when we last updated the metadata.
+     */
+    private long lastMetadataUpdateMs = 0;
+
+    /**
+     * The time in wall-clock milliseconds when we last attempted to fetch new
+     * metadata.
+     */
+    private long lastMetadataFetchAttemptMs = 0;
+
+    /**
+     * The current cluster information.
+     */
+    private Cluster cluster = Cluster.empty();
+
+    /**
+     * If we got an authorization exception when we last attempted to fetch
+     * metadata, this is it; null, otherwise.
+     */
+    private AuthenticationException authException = null;
+
+    public class AdminMetadataUpdater implements MetadataUpdater {
+        @Override
+        public List<Node> fetchNodes() {
+            return cluster.nodes();
+        }
+
+        @Override
+        public boolean isUpdateDue(long now) {
+            return false;
+        }
+
+        @Override
+        public long maybeUpdate(long now) {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public void handleDisconnection(String destination) {
+            // Do nothing
+        }
+
+        @Override
+        public void handleAuthenticationFailure(AuthenticationException e) {
+            log.info("AdminMetadataManager got AuthenticationException", e);
+            update(Cluster.empty(), time.milliseconds(), e);
+        }
+
+        @Override
+        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse) {
+            // Do nothing
+        }
+
+        @Override
+        public void requestUpdate() {
+            // Do nothing
+        }
+    }
+
+    /**
+     * The current AdminMetadataManager state.
+     */
+    enum State {
+        QUIESCENT,
+        UPDATE_REQUESTED,
+        UPDATE_PENDING;
+    }
+
+    public AdminMetadataManager(LogContext logContext, Time time, long refreshBackoffMs,
+                                long metadataExpireMs) {
+        this.log = logContext.logger(AdminMetadataManager.class);
+        this.time = time;
+        this.refreshBackoffMs = refreshBackoffMs;
+        this.metadataExpireMs = metadataExpireMs;
+        this.updater = new AdminMetadataUpdater();
+    }
+
+    public AdminMetadataUpdater updater() {
+        return updater;
+    }
+
+    public boolean isReady() {
+        if (authException != null) {
+            log.debug("Metadata is not usable: failed to get metadata.", authException);
+            throw authException;
+        }
+        if (cluster.nodes().isEmpty()) {
+            log.trace("Metadata is not ready: bootstrap nodes have not been " +
+                "initialized yet.");
+            return false;
+        }
+        if (cluster.isBootstrapConfigured()) {
+            log.trace("Metadata is not ready: we have not fetched metadata from " +
+                "the bootstrap nodes yet.");
+            return false;
+        }
+        log.trace("Metadata is ready to use.");
+        return true;
+    }
+
+    public Node controller() {
+        return cluster.controller();
+    }
+
+    public Node nodeById(int nodeId) {
+        return cluster.nodeById(nodeId);
+    }
+
+    public void requestUpdate() {
+        if (state == State.QUIESCENT) {
+            state = State.UPDATE_REQUESTED;
+            log.debug("Requesting metadata update.");
+        }
+    }
+
+    public void clearController() {
+        if (cluster.controller() != null) {
+            log.trace("Clearing cached controller node {}.", cluster.controller());
+            this.cluster = new Cluster(cluster.clusterResource().clusterId(),
+                cluster.nodes(),
+                Collections.<PartitionInfo>emptySet(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                null);
+        }
+    }
+
+    /**
+     * Determine if the AdminClient should fetch new metadata.
+     */
+    public long metadataFetchDelayMs(long now) {
+        switch (state) {
+            case QUIESCENT:
+                // Calculate the time remaining until the next periodic update.
+                // We want to avoid making many metadata requests in a short amount of time,
+                // so there is a metadata refresh backoff period.
+                long timeSinceUpdate = now - lastMetadataUpdateMs;
+                long timeRemainingUntilUpdate = metadataExpireMs - timeSinceUpdate;
+                long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
+                long timeRemainingUntilAttempt = refreshBackoffMs - timeSinceAttempt;
+                return Math.max(Math.max(0L, timeRemainingUntilUpdate), timeRemainingUntilAttempt);
+            case UPDATE_REQUESTED:
+                // An update has been explicitly requested.  Do it as soon as possible.
+                return 0;
+            default:
+                // An update is already pending, so we don't need to initiate another one.
+                return Long.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Transition into the UPDATE_PENDING state.  Updates lastMetadataFetchAttemptMs.
+     */
+    public void transitionToUpdatePending(long now) {
+        this.state = State.UPDATE_PENDING;
+        this.lastMetadataFetchAttemptMs = now;
+    }
+
+    /**
+     * Receive new metadata, and transition into the QUIESCENT state.
+     * Updates lastMetadataUpdateMs, cluster, and authException.
+     */
+    public void update(Cluster cluster, long now, AuthenticationException authException) {
+        if (cluster.isBootstrapConfigured()) {
+            log.debug("Setting bootstrap cluster metadata {}.", cluster);
+        } else {
+            log.debug("Received cluster metadata {}{}.",
+                cluster, authException == null ? "" : " with authentication exception.");
+        }
+        this.state = State.QUIESCENT;
+        this.lastMetadataUpdateMs = now;
+        this.authException = authException;
+        if (!cluster.nodes().isEmpty()) {
+            this.cluster = cluster;
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 0c59f33..ccbaa30 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -274,7 +274,8 @@ public final class Cluster {
 
     @Override
     public String toString() {
-        return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
+        return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes +
+            ", partitions = " + this.partitionsByTopicPartition.values() + ", controller = " + controller + ")";
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 37b43e5..7a8ba1c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -81,6 +81,7 @@ public class MockClient implements KafkaClient {
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
+    private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>();
     private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from a different thread
     private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
@@ -128,11 +129,16 @@ public class MockClient implements KafkaClient {
     }
 
     public void authenticationFailed(Node node, long duration) {
+        pendingAuthenticationErrors.remove(node);
         authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
         disconnect(node.idString());
         blackout(node, duration);
     }
 
+    public void createPendingAuthenticationError(Node node, long blackoutMs) {
+        pendingAuthenticationErrors.put(node, blackoutMs);
+    }
+
     private boolean isBlackedOut(Node node) {
         if (blackedOut.containsKey(node)) {
             long expiration = blackedOut.get(node);
@@ -174,6 +180,26 @@ public class MockClient implements KafkaClient {
 
     @Override
     public void send(ClientRequest request, long now) {
+        // Check if the request is directed to a node with a pending authentication error.
+        for (Iterator<Map.Entry<Node, Long>> authErrorIter =
+             pendingAuthenticationErrors.entrySet().iterator(); authErrorIter.hasNext(); ) {
+            Map.Entry<Node, Long> entry = authErrorIter.next();
+            Node node = entry.getKey();
+            long blackoutMs = entry.getValue();
+            if (node.idString().equals(request.destination())) {
+                authErrorIter.remove();
+                // Set up a disconnected ClientResponse and create an authentication error
+                // for the affected node.
+                authenticationFailed(node, blackoutMs);
+                AbstractRequest.Builder<?> builder = request.requestBuilder();
+                short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
+                    builder.latestAllowedVersion());
+                ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
+                    request.createdTimeMs(), time.milliseconds(), true, null, null);
+                responses.add(resp);
+                return;
+            }
+        }
         Iterator<FutureResponse> iterator = futureResponses.iterator();
         while (iterator.hasNext()) {
             FutureResponse futureResp = iterator.next();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 10281fb..f862c14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -16,11 +16,12 @@
  */
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.Time;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -50,13 +51,25 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
+        this(newMockClient(time, cluster), time, cluster, config);
+    }
+
+    private static MockClient newMockClient(Time time, Cluster cluster) {
+        MockClient mockClient = new MockClient(time);
+        mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
+            cluster.clusterResource().clusterId(),
+            cluster.controller().id(),
+            Collections.<MetadataResponse.TopicMetadata>emptyList()));
+        return mockClient;
+    }
+
+    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster,
+                                  Map<String, Object> config) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
-        Metadata metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
-        this.mockClient = new MockClient(time, metadata);
-        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata, time);
+        this.mockClient = mockClient;
+        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, mockClient, time);
     }
 
     public Time time() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0debed3..cdd9a28 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -39,6 +40,7 @@ import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -65,6 +67,8 @@ import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Ignore;
@@ -86,6 +90,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.requests.ResourceType.BROKER;
@@ -169,15 +174,18 @@ public class KafkaAdminClientTest {
                 KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
     }
 
-    private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
+    private static Cluster mockCluster(int controllerIndex) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
         nodes.put(1, new Node(1, "localhost", 8122));
         nodes.put(2, new Node(2, "localhost", 8123));
-        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+        return new Cluster("mockClusterId", nodes.values(),
                 Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
-                Collections.<String>emptySet(), nodes.get(0));
-        return new AdminClientUnitTestEnv(cluster, configVals);
+                Collections.<String>emptySet(), nodes.get(controllerIndex));
+    }
+
+    private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
+        return new AdminClientUnitTestEnv(mockCluster(0), configVals);
     }
 
     @Test
@@ -204,7 +212,13 @@ public class KafkaAdminClientTest {
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
+        Cluster cluster = mockCluster(0);
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
+                Time.SYSTEM,
+                cluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
+                    AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
@@ -215,6 +229,30 @@ public class KafkaAdminClientTest {
         }
     }
 
+    /**
+     * Test that we propagate exceptions encountered when fetching metadata.
+     */
+    @Test
+    public void testPropagatedMetadataFetchException() throws Exception {
+        Cluster cluster = mockCluster(0);
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
+            TimeUnit.DAYS.toMillis(1));
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
+                Time.SYSTEM,
+                cluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
+                    AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setNode(env.cluster().nodeById(0));
+            env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                new CreateTopicsOptions().timeoutMs(1000)).all();
+            assertFutureError(future, SaslAuthenticationException.class);
+        }
+    }
+
     @Test
     public void testCreateTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -230,6 +268,30 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testCreateTopicsHandleNotControllerException() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(mockCluster(0), Collections.<String>emptySet());
+            env.kafkaClient().prepareMetadataUpdate(mockCluster(1), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().nodeById(0));
+            env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
+                Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
+                env.cluster().nodeById(0));
+            env.kafkaClient().prepareResponse(new MetadataResponse(env.cluster().nodes(),
+                env.cluster().clusterResource().clusterId(),
+                1,
+                Collections.<MetadataResponse.TopicMetadata>emptyList()));
+            env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
+                    Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))),
+                env.cluster().nodeById(1));
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                new CreateTopicsOptions().timeoutMs(10000)).all();
+            future.get();
+        }
+    }
+
+    @Test
     public void testInvalidTopicNames() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -266,33 +328,31 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception {
-        AdminClientUnitTestEnv env = mockClientEnv();
-        Node node = env.cluster().controller();
-        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-        env.kafkaClient().setNode(node);
-        env.kafkaClient().authenticationFailed(node, 300);
-
-        callAdminClientApisAndExpectAnAuthenticationError(env);
-
-        // wait less than the blackout period, the connection should fail and the authentication error should remain
-        env.time().sleep(30);
-        assertTrue(env.kafkaClient().connectionFailed(node));
-        callAdminClientApisAndExpectAnAuthenticationError(env);
-
-        env.close();
+    public void testAdminClientApisAuthenticationFailure() throws Exception {
+        Cluster cluster = mockCluster(0);
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
+            TimeUnit.DAYS.toMillis(1));
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
+                Time.SYSTEM,
+                cluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
+                     AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setNode(env.cluster().controller());
+            callAdminClientApisAndExpectAnAuthenticationError(env);
+        }
     }
 
     private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
-        env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
-
         try {
             env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                     new CreateTopicsOptions().timeoutMs(10000)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
@@ -302,35 +362,40 @@ public class KafkaAdminClientTest {
             env.adminClient().createPartitions(counts).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().describeAcls(FILTER1).values().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
     }
 
@@ -918,7 +983,7 @@ public class KafkaAdminClientTest {
             }
 
             boolean callHasExpired(KafkaAdminClient.Call call) {
-                if (shouldInjectFailure()) {
+                if ((!call.isInternal()) && shouldInjectFailure()) {
                     log.debug("Injecting timeout for {}.", call);
                     return true;
                 } else {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index cda6879..5b1e155 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -52,7 +52,6 @@ public class TopicAdminTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNode(cluster.controller());
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
             boolean created = admin.createTopic(newTopic);
@@ -65,7 +64,7 @@ public class TopicAdminTest {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(cluster.nodes().iterator().next());
             env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
             boolean created = admin.createTopic(newTopic);

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.