You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/06 19:48:24 UTC
kafka git commit: kafka-1642;
(followup patch) [Java New Producer Kafka Trunk] CPU Usage Spike to
100% when network connection is lost; patched by Ewen Cheslack-Postava;
patched by Ewen Cheslack-Postava; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/0.8.2 79d16dc36 -> 53329583a
kafka-1642; (followup patch) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; patched by Ewen Cheslack-Postava; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53329583
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53329583
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53329583
Branch: refs/heads/0.8.2
Commit: 53329583a871ac2027c21979a81dbe0e3bbed4ff
Parents: 79d16dc
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Jan 6 10:48:10 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 6 10:48:10 2015 -0800
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/53329583/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 525b95e..6746275 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -180,9 +180,10 @@ public class NetworkClient implements KafkaClient {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now;
+ long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
+ long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
// if there is no node available to connect, back off refreshing metadata
- long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt);
+ long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
if (!this.metadataFetchInProgress && metadataTimeout == 0)
maybeUpdateMetadata(sends, now);
@@ -371,6 +372,8 @@ public class NetworkClient implements KafkaClient {
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
+ // Beware that the behavior of this method and the computation of timeouts for poll() are
+ // highly dependent on the behavior of leastLoadedNode.
Node node = this.leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
@@ -391,6 +394,14 @@ public class NetworkClient implements KafkaClient {
// we don't have a connection to this node right now, make one
log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
initiateConnect(node, now);
+ // If initiateConnect failed immediately, this node will be put into blackout and we
+ // should allow immediately retrying in case there is another candidate node. If it
+ // is still connecting, the worst case is that we end up setting a longer timeout
+ // on the next round and then wait for the response.
+ } else { // connected, but can't send more OR connecting
+ // In either case, we just need to wait for a network event to let us know the selected
+ // connection might be usable again.
+ this.lastNoNodeAvailableMs = now;
}
}
@@ -400,8 +411,8 @@ public class NetworkClient implements KafkaClient {
private void initiateConnect(Node node, long now) {
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
- selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);
+ selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(node.id());