You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/02 23:18:29 UTC

[kafka] branch 2.4 updated: KAFKA-8933; Fix NPE in DefaultMetadataUpdater after authentication failure (#7682)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 455fbf3  KAFKA-8933; Fix NPE in DefaultMetadataUpdater after authentication failure (#7682)
455fbf3 is described below

commit 455fbf3928efd3363992f6acd18904c489e4f0c8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Dec 4 13:12:17 2019 -0800

    KAFKA-8933; Fix NPE in DefaultMetadataUpdater after authentication failure (#7682)
    
    This patch fixes an NPE in `DefaultMetadataUpdater` due to an inconsistency in event expectations. Whenever there is an authentication failure, we were treating it as a failed update even if was from a separate connection from an inflight metadata request. This patch fixes the problem by making the `MetadataUpdater` api clearer in terms of the events that are handled.
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../kafka/clients/ManualMetadataUpdater.java       |  23 ++---
 .../java/org/apache/kafka/clients/Metadata.java    |  17 +++-
 .../org/apache/kafka/clients/MetadataUpdater.java  |  29 +++---
 .../org/apache/kafka/clients/NetworkClient.java    | 112 ++++++++++-----------
 .../admin/internals/AdminMetadataManager.java      |  17 ++--
 .../kafka/clients/consumer/KafkaConsumer.java      |   2 +-
 .../kafka/clients/producer/KafkaProducer.java      |   2 +-
 .../producer/internals/ProducerMetadata.java       |   7 +-
 .../org/apache/kafka/clients/MetadataTest.java     |  19 +++-
 .../apache/kafka/clients/NetworkClientTest.java    |  77 +++++++++++++-
 .../internals/ConsumerNetworkClientTest.java       |   4 +-
 .../clients/consumer/internals/FetcherTest.java    |   2 +-
 .../producer/internals/ProducerMetadataTest.java   |   8 +-
 .../ClientAuthenticationFailureTest.java           |  40 +++-----
 .../java/org/apache/kafka/test/MockSelector.java   |  20 ++--
 .../runtime/distributed/WorkerGroupMember.java     |   2 +-
 .../kafka/admin/BrokerApiVersionsCommand.scala     |   2 +-
 17 files changed, 219 insertions(+), 164 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 7fb0224..c1c1fba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -18,13 +18,13 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * A simple implementation of `MetadataUpdater` that returns the cluster nodes set via the constructor or via
@@ -36,9 +36,6 @@ import java.util.List;
  * This class is not thread-safe!
  */
 public class ManualMetadataUpdater implements MetadataUpdater {
-
-    private static final Logger log = LoggerFactory.getLogger(ManualMetadataUpdater.class);
-
     private List<Node> nodes;
 
     public ManualMetadataUpdater() {
@@ -69,24 +66,18 @@ public class ManualMetadataUpdater implements MetadataUpdater {
     }
 
     @Override
-    public void handleDisconnection(String destination) {
-        // Do nothing
-    }
-
-    @Override
-    public void handleFatalException(KafkaException exception) {
-        // We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason
-        // for failure.
-        log.debug("An error occurred in broker-to-broker communication.", exception);
+    public void handleServerDisconnect(long now, String nodeId, Optional<AuthenticationException> maybeAuthException) {
+        // We don't fail the broker on failures. There should be sufficient information from
+        // the NetworkClient logs to indicate the reason for the failure.
     }
 
     @Override
-    public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
+    public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
         // Do nothing
     }
 
     @Override
-    public void requestUpdate() {
+    public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
         // Do nothing
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index a3ecdf8..b6fdc06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -209,10 +209,8 @@ public class Metadata implements Closeable {
         }
     }
 
-    public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
+    public synchronized void bootstrap(List<InetSocketAddress> addresses) {
         this.needUpdate = true;
-        this.lastRefreshMs = now;
-        this.lastSuccessfulRefreshMs = now;
         this.updateVersion += 1;
         this.cache = MetadataCache.bootstrap(addresses);
     }
@@ -441,9 +439,18 @@ public class Metadata implements Closeable {
      * Record an attempt to update the metadata that failed. We need to keep track of this
      * to avoid retrying immediately.
      */
-    public synchronized void failedUpdate(long now, KafkaException fatalException) {
+    public synchronized void failedUpdate(long now) {
         this.lastRefreshMs = now;
-        this.fatalException = fatalException;
+    }
+
+    /**
+     * Propagate a fatal error which affects the ability to fetch metadata for the cluster.
+     * Two examples are authentication and unsupported version exceptions.
+     *
+     * @param exception The fatal exception
+     */
+    public synchronized void fatalError(KafkaException exception) {
+        this.fatalException = exception;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index e2261d5..77f3efa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -18,11 +18,14 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
 import java.io.Closeable;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * The interface used by `NetworkClient` to request cluster metadata info to be updated and to retrieve the cluster nodes
@@ -46,7 +49,7 @@ public interface MetadataUpdater extends Closeable {
      * Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would
      * be 0 if an update has been started as a result of this call).
      *
-     * If the implementation relies on `NetworkClient` to send requests, `handleCompletedMetadataResponse` will be
+     * If the implementation relies on `NetworkClient` to send requests, `handleSuccessfulResponse` will be
      * invoked after the metadata response is received.
      *
      * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
@@ -55,20 +58,24 @@ public interface MetadataUpdater extends Closeable {
     long maybeUpdate(long now);
 
     /**
-     * Handle disconnections for metadata requests.
+     * Handle a server disconnect.
      *
      * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
      * requests with special handling for disconnections of such requests.
-     * @param destination
+     *
+     * @param now Current time in milliseconds
+     * @param nodeId The id of the node that disconnected
+     * @param maybeAuthException Optional authentication error
      */
-    void handleDisconnection(String destination);
+    void handleServerDisconnect(long now, String nodeId, Optional<AuthenticationException> maybeAuthException);
 
     /**
-     * Handle failure. Propagate the exception if awaiting metadata.
+     * Handle a metadata request failure.
      *
-     * @param fatalException exception corresponding to the failure
+     * @param now Current time in milliseconds
+     * @param maybeFatalException Optional fatal error (e.g. {@link UnsupportedVersionException})
      */
-    void handleFatalException(KafkaException fatalException);
+    void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException);
 
     /**
      * Handle responses for metadata requests.
@@ -76,13 +83,7 @@ public interface MetadataUpdater extends Closeable {
      * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
      * requests with special handling for completed receives of such requests.
      */
-    void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
-
-    /**
-     * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the
-     * start of the update if possible (see `maybeUpdate` for more information).
-     */
-    void requestUpdate();
+    void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
 
     /**
      * Close this updater.
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d782df8..3431b83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -30,8 +30,8 @@ import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.CommonFields;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -51,11 +51,13 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -308,24 +310,29 @@ public class NetworkClient implements KafkaClient {
             return;
 
         selector.close(nodeId);
-        List<ApiKeys> requestTypes = new ArrayList<>();
         long now = time.milliseconds();
-        for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) {
-            if (request.isInternalRequest) {
-                if (request.header.apiKey() == ApiKeys.METADATA) {
-                    metadataUpdater.handleDisconnection(request.destination);
-                }
-            } else {
-                requestTypes.add(request.header.apiKey());
-                abortedSends.add(new ClientResponse(request.header,
-                        request.callback, request.destination, request.createdTimeMs, now,
-                        true, null, null, null));
-            }
-        }
+
+        cancelInFlightRequests(nodeId, now, abortedSends);
+
         connectionStates.disconnected(nodeId, now);
-        if (log.isDebugEnabled()) {
-            log.debug("Manually disconnected from {}. Removed requests: {}.", nodeId,
-                Utils.join(requestTypes, ", "));
+
+        if (log.isTraceEnabled()) {
+            log.trace("Manually disconnected from {}. Aborted in-flight requests: {}.", nodeId, inFlightRequests);
+        }
+    }
+
+    private void cancelInFlightRequests(String nodeId, long now, Collection<ClientResponse> responses) {
+        Iterable<InFlightRequest> inFlightRequests = this.inFlightRequests.clearAll(nodeId);
+        for (InFlightRequest request : inFlightRequests) {
+            log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
+                    request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
+
+            if (!request.isInternalRequest) {
+                if (responses != null)
+                    responses.add(request.disconnected(now, null));
+            } else if (request.header.apiKey() == ApiKeys.METADATA) {
+                metadataUpdater.handleFailedRequest(now, Optional.empty());
+            }
         }
     }
 
@@ -339,9 +346,8 @@ public class NetworkClient implements KafkaClient {
     @Override
     public void close(String nodeId) {
         selector.close(nodeId);
-        for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
-            if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA)
-                metadataUpdater.handleDisconnection(request.destination);
+        long now = time.milliseconds();
+        cancelInFlightRequests(nodeId, now, null);
         connectionStates.remove(nodeId);
     }
 
@@ -481,10 +487,11 @@ public class NetworkClient implements KafkaClient {
             ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                     clientRequest.callback(), clientRequest.destination(), now, now,
                     false, unsupportedVersionException, null, null);
-            abortedSends.add(clientResponse);
 
-            if (isInternalRequest && clientRequest.apiKey() == ApiKeys.METADATA)
-                metadataUpdater.handleFatalException(unsupportedVersionException);
+            if (!isInternalRequest)
+                abortedSends.add(clientResponse);
+            else if (clientRequest.apiKey() == ApiKeys.METADATA)
+                metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
         }
     }
 
@@ -735,7 +742,6 @@ public class NetworkClient implements KafkaClient {
             case AUTHENTICATION_FAILED:
                 AuthenticationException exception = disconnectState.exception();
                 connectionStates.authenticationFailed(nodeId, now, exception);
-                metadataUpdater.handleFatalException(exception);
                 log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
                     disconnectState.remoteAddress(), exception.getMessage());
                 break;
@@ -752,14 +758,9 @@ public class NetworkClient implements KafkaClient {
             default:
                 break; // Disconnections in other states are logged at debug level in Selector
         }
-        for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
-            log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
-                    request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
-            if (!request.isInternalRequest)
-                responses.add(request.disconnected(now, disconnectState.exception()));
-            else if (request.header.apiKey() == ApiKeys.METADATA)
-                metadataUpdater.handleDisconnection(request.destination);
-        }
+
+        cancelInFlightRequests(nodeId, now, responses);
+        metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception()));
     }
 
     /**
@@ -777,10 +778,6 @@ public class NetworkClient implements KafkaClient {
             log.debug("Disconnecting from node {} due to request timeout.", nodeId);
             processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
         }
-
-        // we disconnected, so we should probably refresh our metadata
-        if (!nodeIds.isEmpty())
-            metadataUpdater.requestUpdate();
     }
 
     private void handleAbortedSends(List<ClientResponse> responses) {
@@ -844,7 +841,7 @@ public class NetworkClient implements KafkaClient {
                     parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
             maybeThrottle(body, req.header.apiVersion(), req.destination, now);
             if (req.isInternalRequest && body instanceof MetadataResponse)
-                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
+                metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) body);
             else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                 handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
             else
@@ -894,9 +891,6 @@ public class NetworkClient implements KafkaClient {
             log.debug("Node {} disconnected.", node);
             processDisconnection(responses, node, now, entry.getValue());
         }
-        // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
-        if (this.selector.disconnected().size() > 0)
-            metadataUpdater.requestUpdate();
     }
 
     /**
@@ -960,10 +954,10 @@ public class NetworkClient implements KafkaClient {
                     this.socketReceiveBuffer);
         } catch (IOException e) {
             log.warn("Error connecting to node {}", node, e);
-            /* attempt failed, we'll try again after the backoff */
+            // Attempt failed, we'll try again after the backoff
             connectionStates.disconnected(nodeConnectionId, now);
-            /* maybe the problem is our metadata, update it */
-            metadataUpdater.requestUpdate();
+            // Notify metadata updater of the connection failure
+            metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
         }
     }
 
@@ -1001,7 +995,6 @@ public class NetworkClient implements KafkaClient {
             long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
 
             long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
-
             if (metadataTimeout > 0) {
                 return metadataTimeout;
             }
@@ -1018,31 +1011,39 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
-        public void handleDisconnection(String destination) {
+        public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeFatalException) {
             Cluster cluster = metadata.fetch();
             // 'processDisconnection' generates warnings for misconfigured bootstrap server configuration
             // resulting in 'Connection Refused' and misconfigured security resulting in authentication failures.
             // The warning below handles the case where a connection to a broker was established, but was disconnected
             // before metadata could be obtained.
             if (cluster.isBootstrapConfigured()) {
-                int nodeId = Integer.parseInt(destination);
+                int nodeId = Integer.parseInt(destinationId);
                 Node node = cluster.nodeById(nodeId);
                 if (node != null)
                     log.warn("Bootstrap broker {} disconnected", node);
             }
 
-            inProgressRequestVersion = null;
+            // If we have a disconnect while an update is due, we treat it as a failed update
+            // so that we can backoff properly
+            if (isUpdateDue(now))
+                handleFailedRequest(now, Optional.empty());
+
+            maybeFatalException.ifPresent(metadata::fatalError);
+
+            // The disconnect may be the result of stale metadata, so request an update
+            metadata.requestUpdate();
         }
 
         @Override
-        public void handleFatalException(KafkaException fatalException) {
-            if (metadata.updateRequested())
-                metadata.failedUpdate(time.milliseconds(), fatalException);
+        public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
+            maybeFatalException.ifPresent(metadata::fatalError);
+            metadata.failedUpdate(now);
             inProgressRequestVersion = null;
         }
 
         @Override
-        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
+        public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
             // If any partition has leader with missing listeners, log up to ten of these partitions
             // for diagnosing broker configuration issues.
             // This could be a transient issue if listeners were added dynamically to brokers.
@@ -1066,7 +1067,7 @@ public class NetworkClient implements KafkaClient {
             // created which means we will get errors and no nodes until it exists
             if (response.brokers().isEmpty()) {
                 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
-                this.metadata.failedUpdate(now, null);
+                this.metadata.failedUpdate(now);
             } else {
                 this.metadata.update(inProgressRequestVersion, response, now);
             }
@@ -1075,11 +1076,6 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
-        public void requestUpdate() {
-            this.metadata.requestUpdate();
-        }
-
-        @Override
         public void close() {
             this.metadata.close();
         }
@@ -1104,10 +1100,10 @@ public class NetworkClient implements KafkaClient {
 
             if (canSendRequest(nodeConnectionId, now)) {
                 Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();
-                this.inProgressRequestVersion = requestAndVersion.requestVersion;
                 MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
+                this.inProgressRequestVersion = requestAndVersion.requestVersion;
                 return defaultRequestTimeoutMs;
             }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index b7080ac..6e83452 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Manages the metadata for KafkaAdminClient.
@@ -100,23 +101,19 @@ public class AdminMetadataManager {
         }
 
         @Override
-        public void handleDisconnection(String destination) {
-            // Do nothing
-        }
-
-        @Override
-        public void handleFatalException(KafkaException e) {
-            updateFailed(e);
+        public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeFatalException) {
+            maybeFatalException.ifPresent(AdminMetadataManager.this::updateFailed);
+            AdminMetadataManager.this.requestUpdate();
         }
 
         @Override
-        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse) {
+        public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
             // Do nothing
         }
 
         @Override
-        public void requestUpdate() {
-            AdminMetadataManager.this.requestUpdate();
+        public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse) {
+            // Do nothing
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9d3cf6b..a431ab3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -734,7 +734,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     subscriptions, logContext, clusterResourceListeners);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.bootstrap(addresses, time.milliseconds());
+            this.metadata.bootstrap(addresses);
             String metricGrpPrefix = "consumer";
 
             FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4b7f3d2..51e1a8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -415,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         logContext,
                         clusterResourceListeners,
                         Time.SYSTEM);
-                this.metadata.bootstrap(addresses, time.milliseconds());
+                this.metadata.bootstrap(addresses);
             }
             this.errors = this.metrics.sensor("errors");
             this.sender = newSender(logContext, kafkaClient, this.metadata);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
index ef53af4..da18189 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
@@ -111,10 +111,9 @@ public class ProducerMetadata extends Metadata {
     }
 
     @Override
-    public synchronized void failedUpdate(long now, KafkaException fatalException) {
-        super.failedUpdate(now, fatalException);
-        if (fatalException != null)
-            notifyAll();
+    public synchronized void fatalError(KafkaException fatalException) {
+        super.fatalError(fatalException);
+        notifyAll();
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7bda534..f690dc6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -118,6 +118,18 @@ public class MetadataTest {
     }
 
     @Test
+    public void testUpdateMetadataAllowedImmediatelyAfterBootstrap() {
+        MockTime time = new MockTime();
+
+        Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(),
+                new ClusterResourceListeners());
+        metadata.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9002)));
+
+        assertEquals(0, metadata.timeToAllowUpdate(time.milliseconds()));
+        assertEquals(0, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
     public void testTimeToNextUpdate() {
         checkTimeToNextUpdate(100, 1000);
         checkTimeToNextUpdate(1000, 100);
@@ -131,7 +143,7 @@ public class MetadataTest {
         long now = 10000;
 
         // lastRefreshMs updated to now.
-        metadata.failedUpdate(now, null);
+        metadata.failedUpdate(now);
 
         // Backing off. Remaining time until next try should be returned.
         assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
@@ -259,7 +271,7 @@ public class MetadataTest {
         metadata.update(emptyMetadataResponse(), time);
 
         assertEquals(100, metadata.timeToNextUpdate(1000));
-        metadata.failedUpdate(1100, null);
+        metadata.failedUpdate(1100);
 
         assertEquals(100, metadata.timeToNextUpdate(1100));
         assertEquals(100, metadata.lastSuccessfulUpdate());
@@ -270,14 +282,13 @@ public class MetadataTest {
 
     @Test
     public void testClusterListenerGetsNotifiedOfUpdate() {
-        long time = 0;
         MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
         ClusterResourceListeners listeners = new ClusterResourceListeners();
         listeners.maybeAdd(mockClusterListener);
         metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), listeners);
 
         String hostName = "www.example.com";
-        metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)), time);
+        metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)));
         assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
                 MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e198f6c..a4145d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
@@ -31,6 +34,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
@@ -48,11 +52,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -83,6 +89,12 @@ public class NetworkClientTest {
                 ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
+    private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) {
+        return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
+                defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
+    }
+
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
@@ -532,16 +544,19 @@ public class NetworkClientTest {
     }
 
     private int sendEmptyProduceRequest() {
+        return sendEmptyProduceRequest(node.idString());
+    }
+
+    private int sendEmptyProduceRequest(String nodeId) {
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
                 Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
+        ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true,
                 defaultRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         return request.correlationId();
     }
 
-
     private void sendResponse(ResponseHeader respHeader, Struct response) {
         Struct responseHeaderStruct = respHeader.toStruct();
         int size = responseHeaderStruct.sizeOf() + response.sizeOf();
@@ -588,6 +603,49 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testAuthenticationFailureWithInFlightMetadataRequest() {
+        int refreshBackoffMs = 50;
+
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(2, Collections.emptyMap());
+        Metadata metadata = new Metadata(refreshBackoffMs, 5000, new LogContext(), new ClusterResourceListeners());
+        metadata.update(metadataResponse, time.milliseconds());
+
+        Cluster cluster = metadata.fetch();
+        Node node1 = cluster.nodes().get(0);
+        Node node2 = cluster.nodes().get(1);
+
+        NetworkClient client = createNetworkClientWithNoVersionDiscovery(metadata);
+
+        awaitReady(client, node1);
+
+        metadata.requestUpdate();
+        time.sleep(refreshBackoffMs);
+
+        client.poll(0, time.milliseconds());
+
+        Optional<Node> nodeWithPendingMetadataOpt = cluster.nodes().stream()
+                .filter(node -> client.hasInFlightRequests(node.idString()))
+                .findFirst();
+        assertEquals(Optional.of(node1), nodeWithPendingMetadataOpt);
+
+        assertFalse(client.ready(node2, time.milliseconds()));
+        selector.serverAuthenticationFailed(node2.idString());
+        client.poll(0, time.milliseconds());
+        assertNotNull(client.authenticationException(node2));
+
+        ByteBuffer requestBuffer = selector.completedSendBuffers().get(0).buffer();
+        RequestHeader header = parseHeader(requestBuffer);
+        assertEquals(ApiKeys.METADATA, header.apiKey());
+
+        ByteBuffer responseBuffer = metadataResponse.serialize(ApiKeys.METADATA, header.apiVersion(), header.correlationId());
+        selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), responseBuffer)));
+
+        int initialUpdateVersion = metadata.updateVersion();
+        client.poll(0, time.milliseconds());
+        assertEquals(initialUpdateVersion + 1, metadata.updateVersion());
+    }
+
+    @Test
     public void testLeastLoadedNodeConsidersThrottledConnections() {
         client.ready(node, time.milliseconds());
         awaitReady(client, node);
@@ -840,9 +898,18 @@ public class NetworkClientTest {
         }
 
         @Override
-        public void handleFatalException(KafkaException exception) {
-            failure = exception;
-            super.handleFatalException(exception);
+        public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeAuthException) {
+            maybeAuthException.ifPresent(exception -> {
+                failure = exception;
+            });
+            super.handleServerDisconnect(now, destinationId, maybeAuthException);
+        }
+
+        @Override
+        public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
+            maybeFatalException.ifPresent(exception -> {
+                failure = exception;
+            });
         }
 
         public KafkaException getAndClearFailure() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 90b0fc5..c50bce4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -235,7 +235,7 @@ public class ConsumerNetworkClientTest {
 
     @Test
     public void testAuthenticationExceptionPropagatedFromMetadata() {
-        metadata.failedUpdate(time.milliseconds(), new AuthenticationException("Authentication failed"));
+        metadata.fatalError(new AuthenticationException("Authentication failed"));
         try {
             consumerClient.poll(time.timer(Duration.ZERO));
             fail("Expected authentication error thrown");
@@ -264,7 +264,7 @@ public class ConsumerNetworkClientTest {
     @Test
     public void testMetadataFailurePropagated() {
         KafkaException metadataException = new KafkaException();
-        metadata.failedUpdate(time.milliseconds(), metadataException);
+        metadata.fatalError(metadataException);
         try {
             consumerClient.poll(time.timer(Duration.ZERO));
             fail("Expected poll to throw exception");
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fd1be37..1aa7814 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3338,7 +3338,7 @@ public class FetcherTest {
         TopicPartition t2p0 = new TopicPartition(topicName2, 0);
         // Expect a metadata refresh.
         metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
-                ClientDnsLookup.DEFAULT), time.milliseconds());
+                ClientDnsLookup.DEFAULT));
 
         Map<String, Integer> partitionNumByTopic = new HashMap<>();
         partitionNumByTopic.put(topicName, 2);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
index 3e29d61..58fbd44 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -37,9 +36,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertThrows;
 
 public class ProducerMetadataTest {
 
@@ -178,9 +177,8 @@ public class ProducerMetadataTest {
     }
 
     @Test
-    public void testMetadataWaitAbortedOnFatalException() throws Exception {
-        Time time = new MockTime();
-        metadata.failedUpdate(time.milliseconds(), new AuthenticationException("Fatal exception from test"));
+    public void testMetadataWaitAbortedOnFatalException() {
+        metadata.fatalError(new AuthenticationException("Fatal exception from test"));
         assertThrows(AuthenticationException.class, () -> metadata.awaitUpdate(0, 1000));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 5da9f04..0a93c6a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.common.security.authenticator;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
@@ -35,6 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,9 +47,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 public class ClientAuthenticationFailureTest {
     private static MockTime time = new MockTime(50);
@@ -88,13 +91,10 @@ public class ClientAuthenticationFailureTest {
         StringDeserializer deserializer = new StringDeserializer();
 
         try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer)) {
-            consumer.subscribe(Arrays.asList(topic));
-            consumer.poll(Duration.ofSeconds(10));
-            fail("Expected an authentication error!");
-        } catch (SaslAuthenticationException e) {
-            // OK
-        } catch (Exception e) {
-            throw new AssertionError("Expected only an authentication error, but another error occurred.", e);
+            assertThrows(SaslAuthenticationException.class, () -> {
+                consumer.subscribe(Collections.singleton(topic));
+                consumer.poll(Duration.ofSeconds(10));
+            });
         }
     }
 
@@ -106,11 +106,8 @@ public class ClientAuthenticationFailureTest {
 
         try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, serializer, serializer)) {
             ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message");
-            producer.send(record).get();
-            fail("Expected an authentication error!");
-        } catch (Exception e) {
-            assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
-                    e.getCause() instanceof SaslAuthenticationException);
+            Future<RecordMetadata> future = producer.send(record);
+            TestUtils.assertFutureThrows(future, SaslAuthenticationException.class);
         }
     }
 
@@ -119,12 +116,8 @@ public class ClientAuthenticationFailureTest {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
         try (Admin client = Admin.create(props)) {
-            DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
-            result.all().get();
-            fail("Expected an authentication error!");
-        } catch (Exception e) {
-            assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
-                    e.getCause() instanceof SaslAuthenticationException);
+            KafkaFuture<Map<String, TopicDescription>> future = client.describeTopics(Collections.singleton("test")).all();
+            TestUtils.assertFutureThrows(future, SaslAuthenticationException.class);
         }
     }
 
@@ -137,10 +130,7 @@ public class ClientAuthenticationFailureTest {
         StringSerializer serializer = new StringSerializer();
 
         try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, serializer, serializer)) {
-            producer.initTransactions();
-            fail("Expected an authentication error!");
-        } catch (SaslAuthenticationException e) {
-            // expected exception
+            assertThrows(SaslAuthenticationException.class, producer::initTransactions);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 9f3c432..90a83b0 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.requests.ByteBufferChannel;
@@ -88,13 +88,15 @@ public class MockSelector implements Selectable {
         close(id);
     }
 
+    public void serverAuthenticationFailed(String id) {
+        ChannelState authFailed = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED,
+                new AuthenticationException("Authentication failed"), null);
+        this.disconnected.put(id, authFailed);
+        close(id);
+    }
+
     private void removeSendsForNode(String id, Collection<Send> sends) {
-        Iterator<Send> iter = sends.iterator();
-        while (iter.hasNext()) {
-            Send send = iter.next();
-            if (id.equals(send.destination()))
-                iter.remove();
-        }
+        sends.removeIf(send -> id.equals(send.destination()));
     }
 
     public void clear() {
@@ -153,10 +155,6 @@ public class MockSelector implements Selectable {
         return completedSends;
     }
 
-    public void completeSend(NetworkSend send) {
-        this.completedSends.add(send);
-    }
-
     public List<ByteBufferChannel> completedSendBuffers() {
         return completedSendBuffers;
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index cc052c2..13a52d4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -96,7 +96,7 @@ public class WorkerGroupMember {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
                     config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.bootstrap(addresses, time.milliseconds());
+            this.metadata.bootstrap(addresses);
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
             NetworkClient netClient = new NetworkClient(
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index b97d858..92cdb9e 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -278,7 +278,7 @@ object BrokerApiVersionsCommand {
       val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
       val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
-      metadata.bootstrap(brokerAddresses, time.milliseconds())
+      metadata.bootstrap(brokerAddresses)
 
       val selector = new Selector(
         DefaultConnectionMaxIdleMs,