You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:52 UTC

[36/50] [abbrv] kafka git commit: KAFKA-3488; Avoid failing of unsent requests in consumer where possible

KAFKA-3488; Avoid failing of unsent requests in consumer where possible

Fail unsent requests only when returning from KafkaConsumer.poll().

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1183 from rajinisivaram/KAFKA-3488


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c34df15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c34df15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c34df15

Branch: refs/heads/0.10.0
Commit: 9c34df1511a769b272893b75ec1ed90d38cc9576
Parents: 8dbd688
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Apr 7 15:48:50 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Apr 7 15:48:50 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  3 +-
 .../consumer/internals/AbstractCoordinator.java |  2 +
 .../consumer/internals/ConsumerCoordinator.java |  3 -
 .../internals/ConsumerNetworkClient.java        | 58 +++++++++++-------
 .../consumer/internals/SendFailedException.java | 27 ---------
 .../GroupCoordinatorNotAvailableException.java  |  1 +
 .../internals/ConsumerCoordinatorTest.java      |  2 +-
 .../internals/ConsumerNetworkClientTest.java    | 63 +++++++++++++++++++-
 .../clients/consumer/internals/FetcherTest.java |  2 +-
 .../runtime/distributed/WorkerGroupMember.java  |  3 +-
 .../distributed/WorkerCoordinatorTest.java      |  2 +-
 .../main/scala/kafka/admin/AdminClient.scala    | 23 +++----
 12 files changed, 118 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c457c83..5576431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -584,7 +584,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
             List<PartitionAssignor> assignors = config.getConfiguredInstances(

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c79d8e7..1e6757e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -521,6 +522,7 @@ public abstract class AbstractCoordinator implements Closeable {
     protected void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
+            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
             this.coordinator = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a364987..86b60d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -422,9 +422,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                     if (exception == null) {
                         reschedule(now + interval);
-                    } else if (exception instanceof SendFailedException) {
-                        log.debug("Failed to send automatic offset commit for group {}", groupId);
-                        reschedule(now);
                     } else {
                         log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
                         reschedule(now + interval);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b70994d..4119954 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -19,6 +19,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -40,12 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Higher level consumer access to the network layer with basic support for futures and
  * task scheduling. NOT thread-safe!
- *
- * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
- * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
- * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
- * understand, but there are opportunities to provide timeout or retry capabilities in the future.
- * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
  */
 public class ConsumerNetworkClient implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
@@ -57,17 +52,20 @@ public class ConsumerNetworkClient implements Closeable {
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
+    private final long unsentExpiryMs;
     // wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
     volatile private boolean wakeupsEnabled = true;
 
     public ConsumerNetworkClient(KafkaClient client,
                                  Metadata metadata,
                                  Time time,
-                                 long retryBackoffMs) {
+                                 long retryBackoffMs,
+                                 long requestTimeoutMs) {
         this.client = client;
         this.metadata = metadata;
         this.time = time;
         this.retryBackoffMs = retryBackoffMs;
+        this.unsentExpiryMs = requestTimeoutMs;
     }
 
     /**
@@ -227,8 +225,8 @@ public class ConsumerNetworkClient implements Closeable {
         // cleared or a connect finished in the poll
         trySend(now);
 
-        // fail all requests that couldn't be sent
-        failUnsentRequests();
+        // fail requests that couldn't be sent if they have expired
+        failExpiredRequests(now);
     }
 
     /**
@@ -274,29 +272,48 @@ public class ConsumerNetworkClient implements Closeable {
             Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
             Node node = requestEntry.getKey();
             if (client.connectionFailed(node)) {
+                // Remove entry before invoking request callback to avoid callbacks handling
+                // coordinator failures traversing the unsent list again.
+                iterator.remove();
                 for (ClientRequest request : requestEntry.getValue()) {
                     RequestFutureCompletionHandler handler =
                             (RequestFutureCompletionHandler) request.callback();
                     handler.onComplete(new ClientResponse(request, now, true, null));
                 }
-                iterator.remove();
             }
         }
     }
 
-    private void failUnsentRequests() {
-        // clear all unsent requests and fail their corresponding futures
-        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
-            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
-            while (iterator.hasNext()) {
-                ClientRequest request = iterator.next();
-                RequestFutureCompletionHandler handler =
-                        (RequestFutureCompletionHandler) request.callback();
-                handler.raise(SendFailedException.INSTANCE);
+    private void failExpiredRequests(long now) {
+        // clear all expired unsent requests and fail their corresponding futures
+        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+            Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
+            while (requestIterator.hasNext()) {
+                ClientRequest request = requestIterator.next();
+                if (request.createdTimeMs() < now - unsentExpiryMs) {
+                    RequestFutureCompletionHandler handler =
+                            (RequestFutureCompletionHandler) request.callback();
+                    handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
+                    requestIterator.remove();
+                } else
+                    break;
+            }
+            if (requestEntry.getValue().isEmpty())
                 iterator.remove();
+        }
+    }
+
+    protected void failUnsentRequests(Node node, RuntimeException e) {
+        // clear unsent requests to node and fail their corresponding futures
+        List<ClientRequest> unsentRequests = unsent.remove(node);
+        if (unsentRequests != null) {
+            for (ClientRequest request : unsentRequests) {
+                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+                handler.raise(e);
             }
         }
-        unsent.clear();
     }
 
     private boolean trySend(long now) {
@@ -320,7 +337,6 @@ public class ConsumerNetworkClient implements Closeable {
     private void clientPoll(long timeout, long now) {
         client.poll(timeout, now);
         if (wakeupsEnabled && wakeup.get()) {
-            failUnsentRequests();
             wakeup.set(false);
             throw new WakeupException();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
deleted file mode 100644
index 3312a2c..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.common.errors.RetriableException;
-
-/**
- * Exception used in {@link ConsumerNetworkClient} to indicate the failure
- * to transmit a request to the networking layer. This could be either because
- * the client is still connecting to the given host or its send buffer is full.
- */
-public class SendFailedException extends RetriableException {
-    public static final SendFailedException INSTANCE = new SendFailedException();
-
-    private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
index c0949e3..554b885 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.errors;
  * not yet been created.
  */
 public class GroupCoordinatorNotAvailableException extends RetriableException {
+    public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException();
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 623e5ef..2189c30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -103,7 +103,7 @@ public class ConsumerCoordinatorTest {
         this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.metadata.update(cluster, time.milliseconds());
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.defaultOffsetCommitCallback = new MockCommitCallback();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 1692010..f0f2a97 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -40,7 +42,7 @@ public class ConsumerNetworkClientTest {
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
 
     @Test
     public void send() {
@@ -104,6 +106,65 @@ public class ConsumerNetworkClientTest {
         assertTrue(future.isDone());
     }
 
+    @Test
+    public void sendExpiry() throws InterruptedException {
+        long unsentExpiryMs = 10;
+        final AtomicBoolean isReady = new AtomicBoolean();
+        final AtomicBoolean disconnected = new AtomicBoolean();
+        client = new MockClient(time) {
+            @Override
+            public boolean ready(Node node, long now) {
+                if (isReady.get())
+                    return super.ready(node, now);
+                else
+                    return false;
+            }
+            @Override
+            public boolean connectionFailed(Node node) {
+                return disconnected.get();
+            }
+        };
+        // Queue first send, sleep long enough for this to expire and then queue second send
+        consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs);
+        RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        assertFalse(future1.isDone());
+
+        time.sleep(unsentExpiryMs + 1);
+        RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(2, consumerClient.pendingRequestCount());
+        assertEquals(2, consumerClient.pendingRequestCount(node));
+        assertFalse(future2.isDone());
+
+        // First send should have expired and second send still pending
+        consumerClient.poll(0);
+        assertTrue(future1.isDone());
+        assertFalse(future1.succeeded());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        assertFalse(future2.isDone());
+
+        // Enable send, the un-expired send should succeed on poll
+        isReady.set(true);
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        consumerClient.poll(future2);
+        ClientResponse clientResponse = future2.value();
+        HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
+        isReady.set(false);
+        RequestFuture<ClientResponse> future3 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        disconnected.set(true);
+        consumerClient.poll(0);
+        assertTrue(future3.isDone());
+        assertFalse(future3.succeeded());
+        assertEquals(0, consumerClient.pendingRequestCount());
+        assertEquals(0, consumerClient.pendingRequestCount(node));
+    }
 
     private HeartbeatRequest heartbeatRequest() {
         return new HeartbeatRequest("group", 1, "memberId");

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 58c3841..9002e81 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -86,7 +86,7 @@ public class FetcherTest {
     private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
     private Metrics metrics = new Metrics(time);
     private static final double EPSILON = 0.0001;
-    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
     private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 7294ed4..57028ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -99,7 +99,8 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,
                     config.getString(DistributedConfig.GROUP_ID_CONFIG),
                     config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index abb62b9..bf33cb3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -88,7 +88,7 @@ public class WorkerCoordinatorTest {
         this.client = new MockClient(time);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.metadata.update(cluster, time.milliseconds());
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index b857315..ef76ffc 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -20,7 +20,7 @@ import kafka.common.KafkaException
 import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
 import kafka.utils.Logging
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.DisconnectException
@@ -43,21 +43,15 @@ class AdminClient(val time: Time,
   private def send(target: Node,
                    api: ApiKeys,
                    request: AbstractRequest): Struct = {
-    var now = time.milliseconds()
-    val deadline = now + requestTimeoutMs
     var future: RequestFuture[ClientResponse] = null
 
-    do {
-      future = client.send(target, api, request)
-      client.poll(future)
+    future = client.send(target, api, request)
+    client.poll(future)
 
-      if (future.succeeded())
-        return future.value().responseBody()
-
-      now = time.milliseconds()
-    } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
-
-    throw future.exception()
+    if (future.succeeded())
+      return future.value().responseBody()
+    else
+      throw future.exception()
   }
 
   private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
@@ -244,7 +238,8 @@ object AdminClient {
       networkClient,
       metadata,
       time,
-      DefaultRetryBackoffMs)
+      DefaultRetryBackoffMs,
+      DefaultRequestTimeoutMs)
 
     new AdminClient(
       time,