You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/21 19:53:55 UTC

kafka git commit: KAFKA-3014: fix integer overflow problem in leastLoadedNode

Repository: kafka
Updated Branches:
  refs/heads/trunk e32131ed6 -> fa4ffb876


KAFKA-3014: fix integer overflow problem in leastLoadedNode

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #696 from hachikuji/KAFKA-3014


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa4ffb87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa4ffb87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa4ffb87

Branch: refs/heads/trunk
Commit: fa4ffb8767da18b2b84ef3d3cbeb2d4c5b804dbe
Parents: e32131e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Dec 21 10:53:51 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Dec 21 10:53:51 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/NetworkClient.java    | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fa4ffb87/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 232a3cb..8c1c543 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -77,9 +77,6 @@ public class NetworkClient implements KafkaClient {
     /* the client id used to identify this client in requests to the server */
     private final String clientId;
 
-    /* a random offset to use when choosing nodes to avoid having all nodes choose the same node */
-    private final int nodeIndexOffset;
-
     /* the current correlation id to use when sending requests to servers */
     private int correlation;
 
@@ -144,7 +141,6 @@ public class NetworkClient implements KafkaClient {
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
         this.randOffset = new Random();
-        this.nodeIndexOffset = this.randOffset.nextInt(Integer.MAX_VALUE);
         this.requestTimeoutMs = requestTimeoutMs;
         this.nodesEverSeen = new ArrayList<>();
         this.nodesEverSeenById = new HashMap<>();
@@ -363,8 +359,10 @@ public class NetworkClient implements KafkaClient {
         List<Node> nodes = this.metadataUpdater.fetchNodes();
         int inflight = Integer.MAX_VALUE;
         Node found = null;
+
+        int offset = this.randOffset.nextInt(nodes.size());
         for (int i = 0; i < nodes.size(); i++) {
-            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
+            int idx = (offset + i) % nodes.size();
             Node node = nodes.get(idx);
             int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
             if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
@@ -379,9 +377,9 @@ public class NetworkClient implements KafkaClient {
 
         // if we found no node in the current list, try one from the nodes seen before
         if (found == null && nodesEverSeen.size() > 0) {
-            int offset = randOffset.nextInt(nodesEverSeen.size());
+            offset = randOffset.nextInt(nodesEverSeen.size());
             for (int i = 0; i < nodesEverSeen.size(); i++) {
-                int idx = Utils.abs((offset + i) % nodesEverSeen.size());
+                int idx = (offset + i) % nodesEverSeen.size();
                 Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
                 log.debug("No node found. Trying previously-seen node with ID {}", node.id());
                 if (!this.connectionStates.isBlackedOut(node.idString(), now)) {