You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/04 09:23:18 UTC

[kafka] branch trunk updated: MINOR: Fix bug in AdminClient node reassignment following connection failure (#5112)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d02f021  MINOR: Fix bug in AdminClient node reassignment following connection failure (#5112)
d02f021 is described below

commit d02f02130eb36cdc4375eb40bdbe4a1f9b58fc60
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 4 02:23:05 2018 -0700

    MINOR: Fix bug in AdminClient node reassignment following connection failure (#5112)
    
    We added logic to reassign nodes in callToSend after a connection failure, but we do not handle the case when there is no node currently available to reassign the request to. This can happen when using MetadataUpdateNodeIdProvider if all of the known nodes are blacked out awaiting the retry backoff. To fix this, we need to ensure that the call is added to pendingCalls if a new node cannot be found.
---
 .../org/apache/kafka/clients/ClientResponse.java   |   8 ++
 .../org/apache/kafka/clients/NetworkClient.java    |  23 ++--
 .../kafka/clients/admin/KafkaAdminClient.java      |  88 +++++++++------
 .../consumer/internals/ConsumerNetworkClient.java  |  11 +-
 .../java/org/apache/kafka/clients/MockClient.java  | 107 ++++++++++++------
 .../clients/admin/AdminClientUnitTestEnv.java      |   8 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  60 +++++++++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 122 +++++++++------------
 .../internals/AbstractCoordinatorTest.java         |  14 +--
 .../internals/ConsumerCoordinatorTest.java         |  14 +--
 .../scala/kafka/common/InterBrokerSendThread.scala |  14 ++-
 .../TransactionCoordinatorConcurrencyTest.scala    |   2 +-
 .../TransactionMarkerChannelManagerTest.scala      |   6 +-
 ...sactionMarkerRequestCompletionHandlerTest.scala |  12 +-
 14 files changed, 298 insertions(+), 191 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 0ff30e9..446bf44 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.RequestHeader;
@@ -33,6 +34,7 @@ public class ClientResponse {
     private final long latencyMs;
     private final boolean disconnected;
     private final UnsupportedVersionException versionMismatch;
+    private final AuthenticationException authenticationException;
     private final AbstractResponse responseBody;
 
     /**
@@ -53,6 +55,7 @@ public class ClientResponse {
                           long receivedTimeMs,
                           boolean disconnected,
                           UnsupportedVersionException versionMismatch,
+                          AuthenticationException authenticationException,
                           AbstractResponse responseBody) {
         this.requestHeader = requestHeader;
         this.callback = callback;
@@ -61,6 +64,7 @@ public class ClientResponse {
         this.latencyMs = receivedTimeMs - createdTimeMs;
         this.disconnected = disconnected;
         this.versionMismatch = versionMismatch;
+        this.authenticationException = authenticationException;
         this.responseBody = responseBody;
     }
 
@@ -76,6 +80,10 @@ public class ClientResponse {
         return versionMismatch;
     }
 
+    public AuthenticationException authenticationException() {
+        return authenticationException;
+    }
+
     public RequestHeader requestHeader() {
         return requestHeader;
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 26fa467..7c87277 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -260,7 +260,7 @@ public class NetworkClient implements KafkaClient {
                 requestTypes.add(request.header.apiKey());
                 abortedSends.add(new ClientResponse(request.header,
                         request.callback, request.destination, request.createdTimeMs, now,
-                        true, null, null));
+                        true, null, null, null));
             }
         }
         connectionStates.disconnected(nodeId, now);
@@ -413,14 +413,14 @@ public class NetworkClient implements KafkaClient {
             // The call to build may also throw UnsupportedVersionException, if there are essential
             // fields that cannot be represented in the chosen version.
             doSend(clientRequest, isInternalRequest, now, builder.build(version));
-        } catch (UnsupportedVersionException e) {
+        } catch (UnsupportedVersionException unsupportedVersionException) {
             // If the version is not supported, skip sending the request over the wire.
             // Instead, simply add it to the local queue of aborted requests.
             log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
-                    clientRequest.correlationId(), clientRequest.destination(), e);
+                    clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
             ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                     clientRequest.callback(), clientRequest.destination(), now, now,
-                    false, e, null);
+                    false, unsupportedVersionException, null, null);
             abortedSends.add(clientResponse);
         }
     }
@@ -615,7 +615,10 @@ public class NetworkClient implements KafkaClient {
      * @param nodeId Id of the node to be disconnected
      * @param now The current time
      */
-    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) {
+    private void processDisconnection(List<ClientResponse> responses,
+                                      String nodeId,
+                                      long now,
+                                      ChannelState disconnectState) {
         connectionStates.disconnected(nodeId, now);
         apiVersions.remove(nodeId);
         nodesNeedingApiVersionsFetch.remove(nodeId);
@@ -641,7 +644,7 @@ public class NetworkClient implements KafkaClient {
             log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
                     request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
             if (!request.isInternalRequest)
-                responses.add(request.disconnected(now));
+                responses.add(request.disconnected(now, disconnectState.exception()));
             else if (request.header.apiKey() == ApiKeys.METADATA)
                 metadataUpdater.handleDisconnection(request.destination);
         }
@@ -1034,11 +1037,13 @@ public class NetworkClient implements KafkaClient {
         }
 
         public ClientResponse completed(AbstractResponse response, long timeMs) {
-            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, null, response);
+            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
+                    false, null, null, response);
         }
 
-        public ClientResponse disconnected(long timeMs) {
-            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null, null);
+        public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException) {
+            return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
+                    true, null, authenticationException, null);
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6b02e11..450de06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -138,6 +138,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
 
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -820,39 +821,50 @@ public class KafkaAdminClient extends AdminClient {
          * Choose nodes for the calls in the pendingCalls list.
          *
          * @param now           The current time in milliseconds.
-         * @param pendingIter   An iterator yielding pending calls.
+         * @return              The minimum time until a call is ready to be retried if any of the pending
+         *                      calls are backing off after a failure
          */
-        private long chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) {
+        private long maybeDrainPendingCalls(long now) {
             long pollTimeout = Long.MAX_VALUE;
-            log.trace("Trying to choose nodes for {} at {}", pendingIter, now);
+            log.trace("Trying to choose nodes for {} at {}", pendingCalls, now);
+
+            Iterator<Call> pendingIter = pendingCalls.iterator();
             while (pendingIter.hasNext()) {
                 Call call = pendingIter.next();
 
                 // If the call is being retried, await the proper backoff before finding the node
                 if (now < call.nextAllowedTryMs) {
                     pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs - now);
-                    continue;
-                }
-
-                Node node = null;
-                try {
-                    node = call.nodeProvider.provide();
-                } catch (Throwable t) {
-                    // Handle authentication errors while choosing nodes.
-                    log.debug("Unable to choose node for {}", call, t);
+                } else if (maybeDrainPendingCall(call, now)) {
                     pendingIter.remove();
-                    call.fail(now, t);
                 }
+            }
+            return pollTimeout;
+        }
+
+        /**
+         * Check whether a pending call can be assigned a node. Return true if the pending call was either
+         * transferred to the callsToSend collection or if the call was failed. Return false if it
+         * should remain pending.
+         */
+        private boolean maybeDrainPendingCall(Call call, long now) {
+            try {
+                Node node = call.nodeProvider.provide();
                 if (node != null) {
                     log.trace("Assigned {} to node {}", call, node);
-                    pendingIter.remove();
                     call.curNode = node;
                     getOrCreateListValue(callsToSend, node).add(call);
+                    return true;
                 } else {
                     log.trace("Unable to assign {} to a node.", call);
+                    return false;
                 }
+            } catch (Throwable t) {
+                // Handle authentication errors while choosing nodes.
+                log.debug("Unable to choose node for {}", call, t);
+                call.fail(now, t);
+                return true;
             }
-            return pollTimeout;
         }
 
         /**
@@ -992,26 +1004,25 @@ public class KafkaAdminClient extends AdminClient {
         }
 
         /**
-         * Reassign calls that have not yet been sent. When metadata is refreshed,
-         * all unsent calls are reassigned to handle controller change and node changes.
-         * When a node is disconnected, all calls assigned to the node are reassigned.
+         * Unassign calls that have not yet been sent based on some predicate. For example, this
+         * is used to reassign the calls that have been assigned to a disconnected node.
          *
-         * @param now The current time in milliseconds
-         * @param disconnectedOnly Reassign only calls to nodes that were disconnected
-         *                         in the last poll
+         * @param shouldUnassign Condition for reassignment. If the predicate is true, then the calls will
+         *                       be put back in the pendingCalls collection and they will be reassigned
          */
-        private void reassignUnsentCalls(long now, boolean disconnectedOnly) {
-            ArrayList<Call> pendingCallsToSend = new ArrayList<>();
+        private void unassignUnsentCalls(Predicate<Node> shouldUnassign) {
             for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
                 Map.Entry<Node, List<Call>> entry = iter.next();
-                if (!disconnectedOnly || client.connectionFailed(entry.getKey())) {
-                    for (Call call : entry.getValue()) {
-                        pendingCallsToSend.add(call);
-                    }
+                Node node = entry.getKey();
+                List<Call> awaitingCalls = entry.getValue();
+
+                if (awaitingCalls.isEmpty()) {
+                    iter.remove();
+                } else if (shouldUnassign.test(node)) {
+                    pendingCalls.addAll(awaitingCalls);
                     iter.remove();
                 }
             }
-            chooseNodesForPendingCalls(now, pendingCallsToSend.iterator());
         }
 
         private boolean hasActiveExternalCalls(Collection<Call> calls) {
@@ -1076,29 +1087,37 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 // Choose nodes for our pending calls.
-                pollTimeout = Math.min(pollTimeout, chooseNodesForPendingCalls(now, pendingCalls.iterator()));
+                pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now));
                 long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
                 if (metadataFetchDelayMs == 0) {
                     metadataManager.transitionToUpdatePending(now);
                     Call metadataCall = makeMetadataCall(now);
                     // Create a new metadata fetch call and add it to the end of pendingCalls.
                     // Assign a node for just the new call (we handled the other pending nodes above).
-                    pendingCalls.add(metadataCall);
-                    chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1));
+
+                    if (!maybeDrainPendingCall(metadataCall, now))
+                        pendingCalls.add(metadataCall);
                 }
                 pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
+
                 if (metadataFetchDelayMs > 0) {
                     pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                 }
 
+                // Ensure that we use a small poll timeout if there are pending calls which need to be sent
+                if (!pendingCalls.isEmpty())
+                    pollTimeout = Math.min(pollTimeout, retryBackoffMs);
+
                 // Wait for network responses.
                 log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
                 List<ClientResponse> responses = client.poll(pollTimeout, now);
                 log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
 
+                // unassign calls to disconnected nodes
+                unassignUnsentCalls(client::connectionFailed);
+
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
-                reassignUnsentCalls(now, true); // reassign calls to disconnected nodes
                 handleResponses(now, responses);
             }
             int numTimedOut = 0;
@@ -1184,7 +1203,10 @@ public class KafkaAdminClient extends AdminClient {
                     MetadataResponse response = (MetadataResponse) abstractResponse;
                     long now = time.milliseconds();
                     metadataManager.update(response.cluster(), now);
-                    reassignUnsentCalls(now, false);
+
+                    // Unassign all unsent requests after a metadata refresh to allow for a new
+                    // destination to be selected from the new metadata
+                    unassignUnsentCalls(node -> true);
                 }
 
                 @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b718d63..a9e167a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -413,12 +413,9 @@ public class ConsumerNetworkClient implements Closeable {
                 for (ClientRequest request : requests) {
                     RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
                     AuthenticationException authenticationException = client.authenticationException(node);
-                    if (authenticationException != null)
-                        handler.onFailure(authenticationException);
-                    else
-                        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
+                    handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
                             request.callback(), request.destination(), request.createdTimeMs(), now, true,
-                            null, null));
+                            null, authenticationException, null));
                 }
             }
         }
@@ -571,6 +568,8 @@ public class ConsumerNetworkClient implements Closeable {
         public void fireCompletion() {
             if (e != null) {
                 future.raise(e);
+            } else if (response.authenticationException() != null) {
+                future.raise(response.authenticationException());
             } else if (response.wasDisconnected()) {
                 log.debug("Cancelled request with header {} due to node {} being disconnected",
                         response.requestHeader(), response.destination());
@@ -611,7 +610,7 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     /*
-     * A threadsafe helper class to hold requests per node that have not been sent yet
+     * A thread-safe helper class to hold requests per node that have not been sent yet
      */
     private final static class UnsentRequests {
         private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 7c2fc0b..2a1e213 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -78,7 +78,12 @@ public class MockClient implements KafkaClient {
     private Cluster cluster;
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
-    private final Map<Node, Long> blackedOut = new HashMap<>();
+
+    // Nodes awaiting reconnect backoff, will not be chosen by leastLoadedNode
+    private final TransientSet<Node> blackedOut;
+    // Nodes which will always fail to connect, but can be chosen by leastLoadedNode
+    private final TransientSet<Node> unreachable;
+
     private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>();
     private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from a different thread
@@ -98,6 +103,8 @@ public class MockClient implements KafkaClient {
         this.time = time;
         this.metadata = metadata;
         this.unavailableTopics = Collections.emptySet();
+        this.blackedOut = new TransientSet<>(time);
+        this.unreachable = new TransientSet<>(time);
     }
 
     @Override
@@ -107,19 +114,21 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean ready(Node node, long now) {
-        if (isBlackedOut(node))
+        if (blackedOut.contains(node, now))
+            return false;
+
+        if (unreachable.contains(node, now)) {
+            blackout(node, 100);
             return false;
-        authenticationErrors.remove(node);
+        }
+
         ready.add(node.idString());
         return true;
     }
 
     @Override
     public long connectionDelay(Node node, long now) {
-        Long blackoutExpiration = blackedOut.get(node);
-        if (blackoutExpiration != null)
-            return Math.max(0, blackoutExpiration - now);
-        return 0;
+        return blackedOut.expirationDelayMs(node, now);
     }
 
     @Override
@@ -127,37 +136,29 @@ public class MockClient implements KafkaClient {
         return connectionDelay(node, now);
     }
 
-    public void blackout(Node node, long duration) {
-        blackedOut.put(node, time.milliseconds() + duration);
+    public void blackout(Node node, long durationMs) {
+        blackedOut.add(node, durationMs);
+    }
+
+    public void setUnreachable(Node node, long durationMs) {
+        disconnect(node.idString());
+        unreachable.add(node, durationMs);
     }
 
-    public void authenticationFailed(Node node, long duration) {
+    public void authenticationFailed(Node node, long blackoutMs) {
         pendingAuthenticationErrors.remove(node);
         authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
         disconnect(node.idString());
-        blackout(node, duration);
+        blackout(node, blackoutMs);
     }
 
     public void createPendingAuthenticationError(Node node, long blackoutMs) {
         pendingAuthenticationErrors.put(node, blackoutMs);
     }
 
-    private boolean isBlackedOut(Node node) {
-        if (blackedOut.containsKey(node)) {
-            long expiration = blackedOut.get(node);
-            if (time.milliseconds() > expiration) {
-                blackedOut.remove(node);
-                return false;
-            } else {
-                return true;
-            }
-        }
-        return false;
-    }
-
     @Override
     public boolean connectionFailed(Node node) {
-        return isBlackedOut(node);
+        return blackedOut.contains(node);
     }
 
     @Override
@@ -174,7 +175,7 @@ public class MockClient implements KafkaClient {
             if (request.destination().equals(node)) {
                 short version = request.requestBuilder().latestAllowedVersion();
                 responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
-                        request.createdTimeMs(), now, true, null, null));
+                        request.createdTimeMs(), now, true, null, null, null));
                 iter.remove();
             }
         }
@@ -198,7 +199,8 @@ public class MockClient implements KafkaClient {
                 short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
                     builder.latestAllowedVersion());
                 ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
-                    request.createdTimeMs(), time.milliseconds(), true, null, null);
+                    request.createdTimeMs(), time.milliseconds(), true, null,
+                        new AuthenticationException("Authentication failed"), null);
                 responses.add(resp);
                 return;
             }
@@ -223,7 +225,7 @@ public class MockClient implements KafkaClient {
 
             ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                     request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
-                    unsupportedVersionException, futureResp.responseBody);
+                    unsupportedVersionException, null, futureResp.responseBody);
             responses.add(resp);
             iterator.remove();
             return;
@@ -320,7 +322,7 @@ public class MockClient implements KafkaClient {
         requests.remove(clientRequest);
         short version = clientRequest.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
-                clientRequest.createdTimeMs(), time.milliseconds(), false, null, response));
+                clientRequest.createdTimeMs(), time.milliseconds(), false, null, null, response));
     }
 
 
@@ -328,7 +330,7 @@ public class MockClient implements KafkaClient {
         ClientRequest request = requests.remove();
         short version = request.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
-                request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
+                request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
     }
 
     public void respondFrom(AbstractResponse response, Node node) {
@@ -343,7 +345,7 @@ public class MockClient implements KafkaClient {
                 iterator.remove();
                 short version = request.requestBuilder().latestAllowedVersion();
                 responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
-                        request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
+                        request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
                 return;
             }
         }
@@ -420,6 +422,7 @@ public class MockClient implements KafkaClient {
     public void reset() {
         ready.clear();
         blackedOut.clear();
+        unreachable.clear();
         requests.clear();
         responses.clear();
         futureResponses.clear();
@@ -511,6 +514,9 @@ public class MockClient implements KafkaClient {
 
     @Override
     public Node leastLoadedNode(long now) {
+        // Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
+        if (blackedOut.contains(node, now))
+            return null;
         return this.node;
     }
 
@@ -539,4 +545,43 @@ public class MockClient implements KafkaClient {
         }
     }
 
+    private static class TransientSet<T> {
+        // The elements in the set mapped to their expiration timestamps
+        private final Map<T, Long> elements = new HashMap<>();
+        private final Time time;
+
+        private TransientSet(Time time) {
+            this.time = time;
+        }
+
+        boolean contains(T element) {
+            return contains(element, time.milliseconds());
+        }
+
+        boolean contains(T element, long now) {
+            return expirationDelayMs(element, now) > 0;
+        }
+
+        void add(T element, long durationMs) {
+            elements.put(element, time.milliseconds() + durationMs);
+        }
+
+        long expirationDelayMs(T element, long now) {
+            Long expirationTimeMs = elements.get(element);
+            if (expirationTimeMs == null) {
+                return 0;
+            } else if (now > expirationTimeMs) {
+                elements.remove(element);
+                return 0;
+            } else {
+                return expirationTimeMs - now;
+            }
+        }
+
+        void clear() {
+            elements.clear();
+        }
+
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index f862c14..3cd807d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -54,12 +54,16 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         this(newMockClient(time, cluster), time, cluster, config);
     }
 
+    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster) {
+        this(mockClient, time, cluster, newStrMap());
+    }
+
     private static MockClient newMockClient(Time time, Cluster cluster) {
         MockClient mockClient = new MockClient(time);
         mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
             cluster.clusterResource().clusterId(),
-            cluster.controller().id(),
-            Collections.<MetadataResponse.TopicMetadata>emptyList()));
+            cluster.controller() == null ? MetadataResponse.NO_CONTROLLER_ID : cluster.controller().id(),
+            Collections.emptyList()));
         return mockClient;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0bc786b..c4af2c7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -183,8 +183,8 @@ public class KafkaAdminClientTest {
         nodes.put(1, new Node(1, "localhost", 8122));
         nodes.put(2, new Node(2, "localhost", 8123));
         return new Cluster("mockClusterId", nodes.values(),
-                Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
-                Collections.<String>emptySet(), nodes.get(controllerIndex));
+                Collections.emptySet(), Collections.emptySet(),
+                Collections.emptySet(), nodes.get(controllerIndex));
     }
 
     private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
@@ -192,7 +192,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testCloseAdminClient() throws Exception {
+    public void testCloseAdminClient() {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
         }
     }
@@ -232,6 +232,60 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testConnectionFailureOnMetadataUpdate() throws Exception {
+        // This tests the scenario in which we successfully connect to the bootstrap server, but
+        // the server disconnects before sending the full response
+
+        Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        mockClient.setNodeApiVersions(NodeApiVersions.create());
+        mockClient.setNode(cluster.nodes().get(0));
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+            Cluster discoveredCluster = mockCluster(0);
+            env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
+            env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
+                    new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
+                            1, Collections.emptyList()));
+            env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
+                    new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all();
+
+            future.get();
+        }
+    }
+
+    @Test
+    public void testUnreachableBootstrapServer() throws Exception {
+        // This tests the scenario in which the bootstrap server is unreachable for a short while,
+        // which prevents AdminClient from being able to send the initial metadata request
+
+        Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        mockClient.setNodeApiVersions(NodeApiVersions.create());
+        mockClient.setNode(cluster.nodes().get(0));
+        mockClient.setUnreachable(cluster.nodes().get(0), 200);
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+            Cluster discoveredCluster = mockCluster(0);
+            env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
+                    new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
+                            1, Collections.emptyList()));
+            env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
+                    new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all();
+
+            future.get();
+        }
+    }
+
     /**
      * Test that we propagate exceptions encountered when fetching metadata.
      */
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4886c6b..4be6884 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -333,7 +333,6 @@ public class KafkaConsumerTest {
         return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
-
     @Test
     public void verifyHeartbeatSent() throws Exception {
         Time time = new MockTime();
@@ -1215,7 +1214,7 @@ public class KafkaConsumerTest {
         Set<TopicPartition> partitions = Utils.mkSet(tp0, tp1);
         consumer.assign(partitions);
         // verify consumer's assignment
-        assertTrue(consumer.assignment().equals(partitions));
+        assertEquals(partitions, consumer.assignment());
 
         consumer.pause(partitions);
         consumer.seekToEnd(partitions);
@@ -1491,8 +1490,52 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() {
+    @Test(expected = AuthenticationException.class)
+    public void testPartitionsForAuthenticationFailure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        consumer.partitionsFor("some other topic");
+    }
+
+    @Test(expected = AuthenticationException.class)
+    public void testBeginningOffsetsAuthenticationFailure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        consumer.beginningOffsets(Collections.singleton(tp0));
+    }
+
+    @Test(expected = AuthenticationException.class)
+    public void testEndOffsetsAuthenticationFailure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        consumer.endOffsets(Collections.singleton(tp0));
+    }
+
+    @Test(expected = AuthenticationException.class)
+    public void testPollAuthenticationFailure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        consumer.subscribe(singleton(topic));
+        consumer.poll(Duration.ZERO);
+    }
+
+    @Test(expected = AuthenticationException.class)
+    public void testOffsetsForTimesAuthenticationFailure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        consumer.offsetsForTimes(singletonMap(tp0, 0L));
+    }
+
+    @Test(expected = AuthenticationException.class)
+    public void testCommitSyncAuthenticationFailure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(10L));
+        consumer.commitSync(offsets);
+    }
+
+    @Test(expected = AuthenticationException.class)
+    public void testCommittedAuthenticationFaiure() {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication();
+        consumer.committed(tp0);
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingAuthentication() {
         Time time = new MockTime();
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
@@ -1500,70 +1543,14 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
-        client.authenticationFailed(node, 300);
         PartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
-        consumer.subscribe(Collections.singleton(topic));
-        callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
-
-        time.sleep(30); // wait less than the blackout period
-        assertTrue(client.connectionFailed(node));
-        callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
-
-        client.requests().clear();
-        consumer.close(0, TimeUnit.MILLISECONDS);
-    }
-
-    private void callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
-        try {
-            consumer.partitionsFor("some other topic");
-            fail("Expected an authentication error!");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        try {
-            consumer.beginningOffsets(Collections.singleton(partition));
-            fail("Expected an authentication error!");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        try {
-            consumer.endOffsets(Collections.singleton(partition));
-            fail("Expected an authentication error!");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        try {
-            consumer.poll(Duration.ZERO);
-            fail("Expected an authentication error!");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
-        offset.put(partition, new OffsetAndMetadata(10L));
-
-        try {
-            consumer.commitSync(offset);
-            fail("Expected an authentication error!");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        try {
-            consumer.committed(partition);
-            fail("Expected an authentication error!");
-        } catch (AuthenticationException e) {
-            // OK
-        }
+        client.createPendingAuthenticationError(node, 0);
+        return newConsumer(time, client, metadata, assignor, false);
     }
 
     private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
@@ -1724,12 +1711,11 @@ public class KafkaConsumerTest {
                     builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
                 records = builder.build();
             }
-            tpResponses.put(partition,
-                            new FetchResponse.PartitionData(
-                                    Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                                    0L, null, records));
+            tpResponses.put(partition, new FetchResponse.PartitionData<>(
+                    Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+                    0L, null, records));
         }
-        return new FetchResponse(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID);
+        return new FetchResponse<>(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID);
     }
 
     private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 135763d..32aae44 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -521,20 +521,10 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() {
+    public void testAuthenticationErrorInEnsureCoordinatorReady() {
         setupCoordinator(RETRY_BACKOFF_MS);
 
-        mockClient.authenticationFailed(node, 300);
-
-        try {
-            coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
-            fail("Expected an authentication error.");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        mockTime.sleep(30); // wait less than the blackout period
-        assertTrue(mockClient.connectionFailed(node));
+        mockClient.createPendingAuthenticationError(node, 300);
 
         try {
             coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 1828873..32da34a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1572,18 +1572,8 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() {
-        client.authenticationFailed(node, 300);
-
-        try {
-            coordinator.ensureActiveGroup();
-            fail("Expected an authentication error.");
-        } catch (AuthenticationException e) {
-            // OK
-        }
-
-        time.sleep(30); // wait less than the blackout period
-        assertTrue(client.connectionFailed(node));
+    public void testAuthenticationFailureInEnsureActiveGroup() {
+        client.createPendingAuthenticationError(node, 300);
 
         try {
             coordinator.ensureActiveGroup();
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index c65e557..60635f1 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -22,6 +22,7 @@ import java.util.Map.Entry
 import kafka.utils.ShutdownableThread
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.AuthenticationException
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.utils.Time
@@ -106,9 +107,10 @@ abstract class InterBrokerSendThread(name: String,
       if (!requests.isEmpty && networkClient.connectionFailed(node)) {
         iterator.remove()
         for (request <- requests.asScala) {
-          if (networkClient.authenticationException(node) != null)
+          val authenticationException = networkClient.authenticationException(node)
+          if (authenticationException != null)
             error(s"Failed to send the following request due to authentication error: $request")
-          completeWithDisconnect(request, now)
+          completeWithDisconnect(request, now, authenticationException)
         }
       }
     }
@@ -119,15 +121,17 @@ abstract class InterBrokerSendThread(name: String,
     val expiredRequests = unsentRequests.removeExpiredRequests(now, unsentExpiryMs)
     for (request <- expiredRequests.asScala) {
       debug(s"Failed to send the following request after $unsentExpiryMs ms: $request")
-      completeWithDisconnect(request, now)
+      completeWithDisconnect(request, now, null)
     }
   }
 
-  def completeWithDisconnect(request: ClientRequest, now: Long): Unit = {
+  def completeWithDisconnect(request: ClientRequest,
+                             now: Long,
+                             authenticationException: AuthenticationException): Unit = {
     val handler = request.callback
     handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
       handler, request.destination, now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ ,
-      null /* versionMismatch */ , null /* responseBody */))
+      null /* versionMismatch */ , authenticationException, null))
   }
 
   def wakeup(): Unit = networkClient.wakeup()
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 046741a..6168077 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -181,7 +181,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
         val request = requestAndHandler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()
         val response = createResponse(request)
         requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-          null, null, 0, 0, false, null, response))
+          null, null, 0, 0, false, null, null, response))
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index a039c53..454b361 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -302,7 +302,7 @@ class TransactionMarkerChannelManagerTest {
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
       requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-        null, null, 0, 0, false, null, response))
+        null, null, 0, 0, false, null, null, response))
     }
 
     EasyMock.verify(txnStateManager)
@@ -351,7 +351,7 @@ class TransactionMarkerChannelManagerTest {
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
       requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-        null, null, 0, 0, false, null, response))
+        null, null, 0, 0, false, null, null, response))
     }
 
     EasyMock.verify(txnStateManager)
@@ -406,7 +406,7 @@ class TransactionMarkerChannelManagerTest {
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
       requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-        null, null, 0, 0, false, null, response))
+        null, null, 0, 0, false, null, null, response))
     }
 
     // call this again so that append log will be retried
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 41ec159..3ca6c1b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -72,7 +72,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
     EasyMock.replay(markerChannelManager)
 
     handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-      null, null, 0, 0, true, null, null))
+      null, null, 0, 0, true, null, null, null))
 
     EasyMock.verify(markerChannelManager)
   }
@@ -86,7 +86,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
 
     try {
       handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-        null, null, 0, 0, false, null, response))
+        null, null, 0, 0, false, null, null, response))
       fail("should have thrown illegal argument exception")
     } catch {
       case _: IllegalStateException => // ok
@@ -204,7 +204,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
 
     val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
     handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-      null, null, 0, 0, false, null, response))
+      null, null, 0, 0, false, null, null, response))
 
     assertEquals(txnMetadata.topicPartitions, mutable.Set[TopicPartition](topicPartition))
     EasyMock.verify(markerChannelManager)
@@ -216,7 +216,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
     val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
     try {
       handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-        null, null, 0, 0, false, null, response))
+        null, null, 0, 0, false, null, null, response))
       fail("should have thrown illegal state exception")
     } catch {
       case _: IllegalStateException => // ok
@@ -237,7 +237,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
 
     val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
     handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-      null, null, 0, 0, false, null, response))
+      null, null, 0, 0, false, null, null, response))
 
     assertTrue(txnMetadata.topicPartitions.isEmpty)
     assertTrue(completed)
@@ -257,7 +257,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
 
     val response = new WriteTxnMarkersResponse(createProducerIdErrorMap(error))
     handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1),
-      null, null, 0, 0, false, null, response))
+      null, null, 0, 0, false, null, null, response))
 
     assertTrue(removed)
   }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.