You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:52 UTC
[36/50] [abbrv] kafka git commit: KAFKA-3488;
Avoid failing of unsent requests in consumer where possible
KAFKA-3488; Avoid failing of unsent requests in consumer where possible
Fail unsent requests only when returning from KafkaConsumer.poll().
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1183 from rajinisivaram/KAFKA-3488
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c34df15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c34df15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c34df15
Branch: refs/heads/0.10.0
Commit: 9c34df1511a769b272893b75ec1ed90d38cc9576
Parents: 8dbd688
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Apr 7 15:48:50 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Apr 7 15:48:50 2016 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 3 +-
.../consumer/internals/AbstractCoordinator.java | 2 +
.../consumer/internals/ConsumerCoordinator.java | 3 -
.../internals/ConsumerNetworkClient.java | 58 +++++++++++-------
.../consumer/internals/SendFailedException.java | 27 ---------
.../GroupCoordinatorNotAvailableException.java | 1 +
.../internals/ConsumerCoordinatorTest.java | 2 +-
.../internals/ConsumerNetworkClientTest.java | 63 +++++++++++++++++++-
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../runtime/distributed/WorkerGroupMember.java | 3 +-
.../distributed/WorkerCoordinatorTest.java | 2 +-
.../main/scala/kafka/admin/AdminClient.scala | 23 +++----
12 files changed, 118 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c457c83..5576431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -584,7 +584,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
- this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+ config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c79d8e7..1e6757e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -521,6 +522,7 @@ public abstract class AbstractCoordinator implements Closeable {
protected void coordinatorDead() {
if (this.coordinator != null) {
log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
+ client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
this.coordinator = null;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a364987..86b60d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -422,9 +422,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
reschedule(now + interval);
- } else if (exception instanceof SendFailedException) {
- log.debug("Failed to send automatic offset commit for group {}", groupId);
- reschedule(now);
} else {
log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
reschedule(now + interval);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b70994d..4119954 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -19,6 +19,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -40,12 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* Higher level consumer access to the network layer with basic support for futures and
* task scheduling. NOT thread-safe!
- *
- * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
- * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
- * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
- * understand, but there are opportunities to provide timeout or retry capabilities in the future.
- * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
*/
public class ConsumerNetworkClient implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
@@ -57,17 +52,20 @@ public class ConsumerNetworkClient implements Closeable {
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
+ private final long unsentExpiryMs;
// wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
volatile private boolean wakeupsEnabled = true;
public ConsumerNetworkClient(KafkaClient client,
Metadata metadata,
Time time,
- long retryBackoffMs) {
+ long retryBackoffMs,
+ long requestTimeoutMs) {
this.client = client;
this.metadata = metadata;
this.time = time;
this.retryBackoffMs = retryBackoffMs;
+ this.unsentExpiryMs = requestTimeoutMs;
}
/**
@@ -227,8 +225,8 @@ public class ConsumerNetworkClient implements Closeable {
// cleared or a connect finished in the poll
trySend(now);
- // fail all requests that couldn't be sent
- failUnsentRequests();
+ // fail requests that couldn't be sent if they have expired
+ failExpiredRequests(now);
}
/**
@@ -274,29 +272,48 @@ public class ConsumerNetworkClient implements Closeable {
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Node node = requestEntry.getKey();
if (client.connectionFailed(node)) {
+ // Remove entry before invoking request callback to avoid callbacks handling
+ // coordinator failures traversing the unsent list again.
+ iterator.remove();
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.onComplete(new ClientResponse(request, now, true, null));
}
- iterator.remove();
}
}
}
- private void failUnsentRequests() {
- // clear all unsent requests and fail their corresponding futures
- for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
- Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
- while (iterator.hasNext()) {
- ClientRequest request = iterator.next();
- RequestFutureCompletionHandler handler =
- (RequestFutureCompletionHandler) request.callback();
- handler.raise(SendFailedException.INSTANCE);
+ private void failExpiredRequests(long now) {
+ // clear all expired unsent requests and fail their corresponding futures
+ Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+ Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
+ while (requestIterator.hasNext()) {
+ ClientRequest request = requestIterator.next();
+ if (request.createdTimeMs() < now - unsentExpiryMs) {
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
+ requestIterator.remove();
+ } else
+ break;
+ }
+ if (requestEntry.getValue().isEmpty())
iterator.remove();
+ }
+ }
+
+ protected void failUnsentRequests(Node node, RuntimeException e) {
+ // clear unsent requests to node and fail their corresponding futures
+ List<ClientRequest> unsentRequests = unsent.remove(node);
+ if (unsentRequests != null) {
+ for (ClientRequest request : unsentRequests) {
+ RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+ handler.raise(e);
}
}
- unsent.clear();
}
private boolean trySend(long now) {
@@ -320,7 +337,6 @@ public class ConsumerNetworkClient implements Closeable {
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
if (wakeupsEnabled && wakeup.get()) {
- failUnsentRequests();
wakeup.set(false);
throw new WakeupException();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
deleted file mode 100644
index 3312a2c..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.common.errors.RetriableException;
-
-/**
- * Exception used in {@link ConsumerNetworkClient} to indicate the failure
- * to transmit a request to the networking layer. This could be either because
- * the client is still connecting to the given host or its send buffer is full.
- */
-public class SendFailedException extends RetriableException {
- public static final SendFailedException INSTANCE = new SendFailedException();
-
- private static final long serialVersionUID = 1L;
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
index c0949e3..554b885 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.errors;
* not yet been created.
*/
public class GroupCoordinatorNotAvailableException extends RetriableException {
+ public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException();
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 623e5ef..2189c30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -103,7 +103,7 @@ public class ConsumerCoordinatorTest {
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
- this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.defaultOffsetCommitCallback = new MockCommitCallback();
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 1692010..f0f2a97 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -40,7 +42,7 @@ public class ConsumerNetworkClientTest {
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
- private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
@Test
public void send() {
@@ -104,6 +106,65 @@ public class ConsumerNetworkClientTest {
assertTrue(future.isDone());
}
+ @Test
+ public void sendExpiry() throws InterruptedException {
+ long unsentExpiryMs = 10;
+ final AtomicBoolean isReady = new AtomicBoolean();
+ final AtomicBoolean disconnected = new AtomicBoolean();
+ client = new MockClient(time) {
+ @Override
+ public boolean ready(Node node, long now) {
+ if (isReady.get())
+ return super.ready(node, now);
+ else
+ return false;
+ }
+ @Override
+ public boolean connectionFailed(Node node) {
+ return disconnected.get();
+ }
+ };
+ // Queue first send, sleep long enough for this to expire and then queue second send
+ consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs);
+ RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ assertFalse(future1.isDone());
+
+ time.sleep(unsentExpiryMs + 1);
+ RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(2, consumerClient.pendingRequestCount());
+ assertEquals(2, consumerClient.pendingRequestCount(node));
+ assertFalse(future2.isDone());
+
+ // First send should have expired and second send still pending
+ consumerClient.poll(0);
+ assertTrue(future1.isDone());
+ assertFalse(future1.succeeded());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ assertFalse(future2.isDone());
+
+ // Enable send, the un-expired send should succeed on poll
+ isReady.set(true);
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ consumerClient.poll(future2);
+ ClientResponse clientResponse = future2.value();
+ HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+ assertEquals(Errors.NONE.code(), response.errorCode());
+
+ // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
+ isReady.set(false);
+ RequestFuture<ClientResponse> future3 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ disconnected.set(true);
+ consumerClient.poll(0);
+ assertTrue(future3.isDone());
+ assertFalse(future3.succeeded());
+ assertEquals(0, consumerClient.pendingRequestCount());
+ assertEquals(0, consumerClient.pendingRequestCount(node));
+ }
private HeartbeatRequest heartbeatRequest() {
return new HeartbeatRequest("group", 1, "memberId");
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 58c3841..9002e81 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -86,7 +86,7 @@ public class FetcherTest {
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
private Metrics metrics = new Metrics(time);
private static final double EPSILON = 0.0001;
- private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 7294ed4..57028ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -99,7 +99,8 @@ public class WorkerGroupMember {
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
- this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+ config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
this.coordinator = new WorkerCoordinator(this.client,
config.getString(DistributedConfig.GROUP_ID_CONFIG),
config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index abb62b9..bf33cb3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -88,7 +88,7 @@ public class WorkerCoordinatorTest {
this.client = new MockClient(time);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
- this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index b857315..ef76ffc 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -20,7 +20,7 @@ import kafka.common.KafkaException
import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
import kafka.utils.Logging
import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.DisconnectException
@@ -43,21 +43,15 @@ class AdminClient(val time: Time,
private def send(target: Node,
api: ApiKeys,
request: AbstractRequest): Struct = {
- var now = time.milliseconds()
- val deadline = now + requestTimeoutMs
var future: RequestFuture[ClientResponse] = null
- do {
- future = client.send(target, api, request)
- client.poll(future)
+ future = client.send(target, api, request)
+ client.poll(future)
- if (future.succeeded())
- return future.value().responseBody()
-
- now = time.milliseconds()
- } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
-
- throw future.exception()
+ if (future.succeeded())
+ return future.value().responseBody()
+ else
+ throw future.exception()
}
private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
@@ -244,7 +238,8 @@ object AdminClient {
networkClient,
metadata,
time,
- DefaultRetryBackoffMs)
+ DefaultRetryBackoffMs,
+ DefaultRequestTimeoutMs)
new AdminClient(
time,