You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/28 00:18:53 UTC
kafka git commit: KAFKA-2677: ensure consumer sees coordinator
disconnects
Repository: kafka
Updated Branches:
refs/heads/trunk e6b343302 -> 0b05d3b93
KAFKA-2677: ensure consumer sees coordinator disconnects
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma, Guozhang Wang
Closes #349 from hachikuji/KAFKA-2677
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b05d3b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b05d3b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b05d3b9
Branch: refs/heads/trunk
Commit: 0b05d3b939c5ed37a4253e7c3614d824e76ed664
Parents: e6b3433
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Oct 27 16:24:10 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 27 16:24:10 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 37 ++++-------
.../consumer/internals/AbstractCoordinator.java | 14 +++-
.../internals/ConsumerNetworkClient.java | 70 ++++++++++++++------
.../org/apache/kafka/clients/MockClient.java | 15 +++--
4 files changed, 83 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4265004..2c56751 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -12,16 +12,6 @@
*/
package org.apache.kafka.clients;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
@@ -40,6 +30,16 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
* user-facing producer and consumer clients.
@@ -58,6 +58,7 @@ public class NetworkClient implements KafkaClient {
/* a list of nodes we've connected to in the past */
private final List<Integer> nodesEverSeen;
private final Map<Integer, Node> nodesEverSeenById;
+
/* random offset into nodesEverSeen list */
private final Random randOffset;
@@ -234,16 +235,6 @@ public class NetworkClient implements KafkaClient {
}
/**
- * Return the state of the connection to the given node
- *
- * @param node The node to check
- * @return The connection state
- */
- public ConnectionState connectionState(String node) {
- return connectionStates.connectionState(node);
- }
-
- /**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
*
* @param request The request
@@ -275,7 +266,6 @@ public class NetworkClient implements KafkaClient {
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
- long updatedNow = now;
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
@@ -283,7 +273,7 @@ public class NetworkClient implements KafkaClient {
}
// process completed actions
- updatedNow = this.time.milliseconds();
+ long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
@@ -612,9 +602,8 @@ public class NetworkClient implements KafkaClient {
* @param nodes Current alive nodes
*/
private void updateNodesEverSeen(List<Node> nodes) {
- Node existing = null;
for (Node n : nodes) {
- existing = nodesEverSeenById.get(n.id());
+ Node existing = nodesEverSeenById.get(n.id());
if (existing == null) {
nodesEverSeenById.put(n.id(), n);
log.debug("Adding node {} to nodes ever seen", n.id());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 4dce586..549c8de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -477,6 +477,8 @@ public abstract class AbstractCoordinator {
groupMetadataResponse.node().host(),
groupMetadataResponse.node().port());
+ client.tryConnect(coordinator);
+
// start sending heartbeats only if we have a valid generation
if (generation > 0)
heartbeatTask.reset();
@@ -488,11 +490,19 @@ public abstract class AbstractCoordinator {
}
/**
- * Check if we know who the coordinator is.
+ * Check if we know who the coordinator is and we have an active connection
* @return true if the coordinator is unknown
*/
public boolean coordinatorUnknown() {
- return this.coordinator == null;
+ if (coordinator == null)
+ return true;
+
+ if (client.connectionFailed(coordinator)) {
+ coordinatorDead();
+ return true;
+ }
+
+ return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index fbfe54a..e3a2514 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -48,7 +48,7 @@ public class ConsumerNetworkClient implements Closeable {
private final KafkaClient client;
private final AtomicBoolean wakeup = new AtomicBoolean(false);
private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
- private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
+ private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
@@ -106,7 +106,7 @@ public class ConsumerNetworkClient implements Closeable {
private void put(Node node, ClientRequest request) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
- nodeUnsent = new ArrayList<ClientRequest>();
+ nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
@@ -183,25 +183,28 @@ public class ConsumerNetworkClient implements Closeable {
private void poll(long timeout, long now) {
// send all the requests we can send now
- pollUnsentRequests(now);
- now = time.milliseconds();
-
+ trySend(now);
+
// ensure we don't poll any longer than the deadline for
// the next scheduled task
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
clientPoll(timeout, now);
+ now = time.milliseconds();
+
+ // handle any disconnects by failing the active requests. note that disconects must
+ // be checked immediately following poll since any subsequent call to client.ready()
+ // will reset the disconnect status
+ checkDisconnects(now);
// execute scheduled tasks
- now = time.milliseconds();
delayedTasks.poll(now);
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
- pollUnsentRequests(now);
+ trySend(now);
// fail all requests that couldn't be sent
- clearUnsentRequests();
-
+ failUnsentRequests();
}
/**
@@ -237,14 +240,27 @@ public class ConsumerNetworkClient implements Closeable {
return total + client.inFlightRequestCount();
}
- private void pollUnsentRequests(long now) {
- while (trySend(now)) {
- clientPoll(0, now);
- now = time.milliseconds();
+ private void checkDisconnects(long now) {
+ // any disconnects affecting requests that have already been transmitted will be handled
+ // by NetworkClient, so we just need to check whether connections for any of the unsent
+ // requests have been disconnected; if they have, then we complete the corresponding future
+ // and set the disconnect flag in the ClientResponse
+ Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+ Node node = requestEntry.getKey();
+ if (client.connectionFailed(node)) {
+ for (ClientRequest request : requestEntry.getValue()) {
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.complete(new ClientResponse(request, now, true, null));
+ }
+ iterator.remove();
+ }
}
}
- private void clearUnsentRequests() {
+ private void failUnsentRequests() {
// clear all unsent requests and fail their corresponding futures
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
@@ -271,11 +287,6 @@ public class ConsumerNetworkClient implements Closeable {
client.send(request, now);
iterator.remove();
requestsSent = true;
- } else if (client.connectionFailed(node)) {
- RequestFutureCompletionHandler handler =
- (RequestFutureCompletionHandler) request.callback();
- handler.onComplete(new ClientResponse(request, now, true, null));
- iterator.remove();
}
}
}
@@ -285,7 +296,7 @@ public class ConsumerNetworkClient implements Closeable {
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
if (wakeup.get()) {
- clearUnsentRequests();
+ failUnsentRequests();
wakeup.set(false);
throw new ConsumerWakeupException();
}
@@ -296,6 +307,25 @@ public class ConsumerNetworkClient implements Closeable {
client.close();
}
+ /**
+ * Find whether a previous connection has failed. Note that the failure state will persist until either
+ * {@link #tryConnect(Node)} or {@link #send(Node, ApiKeys, AbstractRequest)} has been called.
+ * @param node Node to connect to if possible
+ */
+ public boolean connectionFailed(Node node) {
+ return client.connectionFailed(node);
+ }
+
+ /**
+ * Initiate a connection if currently possible. This is only really useful for resetting the failed
+ * status of a socket. If there is an actual request to send, then {@link #send(Node, ApiKeys, AbstractRequest)}
+ * should be used.
+ * @param node The node to connect to
+ */
+ public void tryConnect(Node node) {
+ client.ready(node, time.milliseconds());
+ }
+
public static class RequestFutureCompletionHandler
extends RequestFuture<ClientResponse>
implements RequestCompletionHandler {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 67d894d..2726e87 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -57,10 +57,10 @@ public class MockClient implements KafkaClient {
private final Time time;
private int correlation = 0;
private Node node = null;
- private final Set<Integer> ready = new HashSet<Integer>();
- private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
- private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
- private final Queue<FutureResponse> futureResponses = new ArrayDeque<FutureResponse>();
+ private final Set<Integer> ready = new HashSet<>();
+ private final Queue<ClientRequest> requests = new ArrayDeque<>();
+ private final Queue<ClientResponse> responses = new ArrayDeque<>();
+ private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
public MockClient(Time time) {
this.time = time;
@@ -88,11 +88,12 @@ public class MockClient implements KafkaClient {
}
public void disconnect(String node) {
+ long now = time.milliseconds();
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
ClientRequest request = iter.next();
if (request.request().destination() == node) {
- responses.add(new ClientResponse(request, time.milliseconds(), true, null));
+ responses.add(new ClientResponse(request, now, true, null));
iter.remove();
}
}
@@ -146,7 +147,7 @@ public class MockClient implements KafkaClient {
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
- * match, {@link #send(ClientRequest)} will throw IllegalStateException
+ * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param body The response body
*/
@@ -160,7 +161,7 @@ public class MockClient implements KafkaClient {
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
- * match, {@link #send(ClientRequest)} will throw IllegalStateException
+ * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param body The response body
* @param disconnected Whether the request was disconnected