You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/28 00:18:53 UTC

kafka git commit: KAFKA-2677: ensure consumer sees coordinator disconnects

Repository: kafka
Updated Branches:
  refs/heads/trunk e6b343302 -> 0b05d3b93


KAFKA-2677: ensure consumer sees coordinator disconnects

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #349 from hachikuji/KAFKA-2677


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b05d3b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b05d3b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b05d3b9

Branch: refs/heads/trunk
Commit: 0b05d3b939c5ed37a4253e7c3614d824e76ed664
Parents: e6b3433
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Oct 27 16:24:10 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 27 16:24:10 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 37 ++++-------
 .../consumer/internals/AbstractCoordinator.java | 14 +++-
 .../internals/ConsumerNetworkClient.java        | 70 ++++++++++++++------
 .../org/apache/kafka/clients/MockClient.java    | 15 +++--
 4 files changed, 83 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4265004..2c56751 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -12,16 +12,6 @@
  */
 package org.apache.kafka.clients;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -40,6 +30,16 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
  * user-facing producer and consumer clients.
@@ -58,6 +58,7 @@ public class NetworkClient implements KafkaClient {
     /* a list of nodes we've connected to in the past */
     private final List<Integer> nodesEverSeen;
     private final Map<Integer, Node> nodesEverSeenById;
+
     /* random offset into nodesEverSeen list */
     private final Random randOffset;
     
@@ -234,16 +235,6 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Return the state of the connection to the given node
-     *
-     * @param node The node to check
-     * @return The connection state
-     */
-    public ConnectionState connectionState(String node) {
-        return connectionStates.connectionState(node);
-    }
-
-    /**
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
      *
      * @param request The request
@@ -275,7 +266,6 @@ public class NetworkClient implements KafkaClient {
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
         long metadataTimeout = metadataUpdater.maybeUpdate(now);
-        long updatedNow = now;
         try {
             this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
         } catch (IOException e) {
@@ -283,7 +273,7 @@ public class NetworkClient implements KafkaClient {
         }
 
         // process completed actions
-        updatedNow = this.time.milliseconds();
+        long updatedNow = this.time.milliseconds();
         List<ClientResponse> responses = new ArrayList<>();
         handleCompletedSends(responses, updatedNow);
         handleCompletedReceives(responses, updatedNow);
@@ -612,9 +602,8 @@ public class NetworkClient implements KafkaClient {
          * @param nodes Current alive nodes
          */
         private void updateNodesEverSeen(List<Node> nodes) {
-            Node existing = null;
             for (Node n : nodes) {
-                existing = nodesEverSeenById.get(n.id());
+                Node existing = nodesEverSeenById.get(n.id());
                 if (existing == null) {
                     nodesEverSeenById.put(n.id(), n);
                     log.debug("Adding node {} to nodes ever seen", n.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 4dce586..549c8de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -477,6 +477,8 @@ public abstract class AbstractCoordinator {
                         groupMetadataResponse.node().host(),
                         groupMetadataResponse.node().port());
 
+                client.tryConnect(coordinator);
+
                 // start sending heartbeats only if we have a valid generation
                 if (generation > 0)
                     heartbeatTask.reset();
@@ -488,11 +490,19 @@ public abstract class AbstractCoordinator {
     }
 
     /**
-     * Check if we know who the coordinator is.
+     * Check if we know who the coordinator is and we have an active connection
      * @return true if the coordinator is unknown
      */
     public boolean coordinatorUnknown() {
-        return this.coordinator == null;
+        if (coordinator == null)
+            return true;
+
+        if (client.connectionFailed(coordinator)) {
+            coordinatorDead();
+            return true;
+        }
+
+        return false;
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index fbfe54a..e3a2514 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -48,7 +48,7 @@ public class ConsumerNetworkClient implements Closeable {
     private final KafkaClient client;
     private final AtomicBoolean wakeup = new AtomicBoolean(false);
     private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
-    private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
+    private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
@@ -106,7 +106,7 @@ public class ConsumerNetworkClient implements Closeable {
     private void put(Node node, ClientRequest request) {
         List<ClientRequest> nodeUnsent = unsent.get(node);
         if (nodeUnsent == null) {
-            nodeUnsent = new ArrayList<ClientRequest>();
+            nodeUnsent = new ArrayList<>();
             unsent.put(node, nodeUnsent);
         }
         nodeUnsent.add(request);
@@ -183,25 +183,28 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void poll(long timeout, long now) {
         // send all the requests we can send now
-        pollUnsentRequests(now);
-        now = time.milliseconds();
-        
+        trySend(now);
+
         // ensure we don't poll any longer than the deadline for
         // the next scheduled task
         timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
         clientPoll(timeout, now);
+        now = time.milliseconds();
+
+        // handle any disconnects by failing the active requests. note that disconects must
+        // be checked immediately following poll since any subsequent call to client.ready()
+        // will reset the disconnect status
+        checkDisconnects(now);
 
         // execute scheduled tasks
-        now = time.milliseconds();
         delayedTasks.poll(now);
 
         // try again to send requests since buffer space may have been
         // cleared or a connect finished in the poll
-        pollUnsentRequests(now);
+        trySend(now);
 
         // fail all requests that couldn't be sent
-        clearUnsentRequests();
-
+        failUnsentRequests();
     }
 
     /**
@@ -237,14 +240,27 @@ public class ConsumerNetworkClient implements Closeable {
         return total + client.inFlightRequestCount();
     }
 
-    private void pollUnsentRequests(long now) {
-        while (trySend(now)) {
-            clientPoll(0, now);
-            now = time.milliseconds();
+    private void checkDisconnects(long now) {
+        // any disconnects affecting requests that have already been transmitted will be handled
+        // by NetworkClient, so we just need to check whether connections for any of the unsent
+        // requests have been disconnected; if they have, then we complete the corresponding future
+        // and set the disconnect flag in the ClientResponse
+        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+            Node node = requestEntry.getKey();
+            if (client.connectionFailed(node)) {
+                for (ClientRequest request : requestEntry.getValue()) {
+                    RequestFutureCompletionHandler handler =
+                            (RequestFutureCompletionHandler) request.callback();
+                    handler.complete(new ClientResponse(request, now, true, null));
+                }
+                iterator.remove();
+            }
         }
     }
 
-    private void clearUnsentRequests() {
+    private void failUnsentRequests() {
         // clear all unsent requests and fail their corresponding futures
         for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
             Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
@@ -271,11 +287,6 @@ public class ConsumerNetworkClient implements Closeable {
                     client.send(request, now);
                     iterator.remove();
                     requestsSent = true;
-                } else if (client.connectionFailed(node)) {
-                    RequestFutureCompletionHandler handler =
-                            (RequestFutureCompletionHandler) request.callback();
-                    handler.onComplete(new ClientResponse(request, now, true, null));
-                    iterator.remove();
                 }
             }
         }
@@ -285,7 +296,7 @@ public class ConsumerNetworkClient implements Closeable {
     private void clientPoll(long timeout, long now) {
         client.poll(timeout, now);
         if (wakeup.get()) {
-            clearUnsentRequests();
+            failUnsentRequests();
             wakeup.set(false);
             throw new ConsumerWakeupException();
         }
@@ -296,6 +307,25 @@ public class ConsumerNetworkClient implements Closeable {
         client.close();
     }
 
+    /**
+     * Find whether a previous connection has failed. Note that the failure state will persist until either
+     * {@link #tryConnect(Node)} or {@link #send(Node, ApiKeys, AbstractRequest)} has been called.
+     * @param node Node to connect to if possible
+     */
+    public boolean connectionFailed(Node node) {
+        return client.connectionFailed(node);
+    }
+
+    /**
+     * Initiate a connection if currently possible. This is only really useful for resetting the failed
+     * status of a socket. If there is an actual request to send, then {@link #send(Node, ApiKeys, AbstractRequest)}
+     * should be used.
+     * @param node The node to connect to
+     */
+    public void tryConnect(Node node) {
+        client.ready(node, time.milliseconds());
+    }
+
     public static class RequestFutureCompletionHandler
             extends RequestFuture<ClientResponse>
             implements RequestCompletionHandler {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 67d894d..2726e87 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -57,10 +57,10 @@ public class MockClient implements KafkaClient {
     private final Time time;
     private int correlation = 0;
     private Node node = null;
-    private final Set<Integer> ready = new HashSet<Integer>();
-    private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
-    private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
-    private final Queue<FutureResponse> futureResponses = new ArrayDeque<FutureResponse>();
+    private final Set<Integer> ready = new HashSet<>();
+    private final Queue<ClientRequest> requests = new ArrayDeque<>();
+    private final Queue<ClientResponse> responses = new ArrayDeque<>();
+    private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
 
     public MockClient(Time time) {
         this.time = time;
@@ -88,11 +88,12 @@ public class MockClient implements KafkaClient {
     }
 
     public void disconnect(String node) {
+        long now = time.milliseconds();
         Iterator<ClientRequest> iter = requests.iterator();
         while (iter.hasNext()) {
             ClientRequest request = iter.next();
             if (request.request().destination() == node) {
-                responses.add(new ClientResponse(request, time.milliseconds(), true, null));
+                responses.add(new ClientResponse(request, now, true, null));
                 iter.remove();
             }
         }
@@ -146,7 +147,7 @@ public class MockClient implements KafkaClient {
 
     /**
      * Prepare a response for a request matching the provided matcher. If the matcher does not
-     * match, {@link #send(ClientRequest)} will throw IllegalStateException
+     * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
      * @param matcher The matcher to apply
      * @param body The response body
      */
@@ -160,7 +161,7 @@ public class MockClient implements KafkaClient {
 
     /**
      * Prepare a response for a request matching the provided matcher. If the matcher does not
-     * match, {@link #send(ClientRequest)} will throw IllegalStateException
+     * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
      * @param matcher The matcher to apply
      * @param body The response body
      * @param disconnected Whether the request was disconnected