You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/06 19:48:24 UTC

kafka git commit: kafka-1642; (followup patch) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; patched by Ewen Cheslack-Postava; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 79d16dc36 -> 53329583a


kafka-1642; (followup patch) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; patched by Ewen Cheslack-Postava; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53329583
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53329583
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53329583

Branch: refs/heads/0.8.2
Commit: 53329583a871ac2027c21979a81dbe0e3bbed4ff
Parents: 79d16dc
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Jan 6 10:48:10 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 6 10:48:10 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java    | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53329583/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 525b95e..6746275 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -180,9 +180,10 @@ public class NetworkClient implements KafkaClient {
 
         // should we update our metadata?
         long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-        long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now;
+        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
+        long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
         // if there is no node available to connect, back off refreshing metadata
-        long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt);
+        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
         if (!this.metadataFetchInProgress && metadataTimeout == 0)
             maybeUpdateMetadata(sends, now);
 
@@ -371,6 +372,8 @@ public class NetworkClient implements KafkaClient {
      * Add a metadata request to the list of sends if we can make one
      */
     private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
+        // Beware that the behavior of this method and the computation of timeouts for poll() are
+        // highly dependent on the behavior of leastLoadedNode.
         Node node = this.leastLoadedNode(now);
         if (node == null) {
             log.debug("Give up sending metadata request since no node is available");
@@ -391,6 +394,14 @@ public class NetworkClient implements KafkaClient {
             // we don't have a connection to this node right now, make one
             log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
             initiateConnect(node, now);
+            // If initiateConnect failed immediately, this node will be put into blackout and we
+            // should allow immediately retrying in case there is another candidate node. If it
+            // is still connecting, the worst case is that we end up setting a longer timeout
+            // on the next round and then wait for the response.
+        } else { // connected, but can't send more OR connecting
+            // In either case, we just need to wait for a network event to let us know the selected
+            // connection might be usable again.
+            this.lastNoNodeAvailableMs = now;
         }
     }
 
@@ -400,8 +411,8 @@ public class NetworkClient implements KafkaClient {
     private void initiateConnect(Node node, long now) {
         try {
             log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
-            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
             this.connectionStates.connecting(node.id(), now);
+            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(node.id());