You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/21 19:53:55 UTC
kafka git commit: KAFKA-3014: fix integer overflow problem in
leastLoadedNode
Repository: kafka
Updated Branches:
refs/heads/trunk e32131ed6 -> fa4ffb876
KAFKA-3014: fix integer overflow problem in leastLoadedNode
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #696 from hachikuji/KAFKA-3014
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa4ffb87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa4ffb87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa4ffb87
Branch: refs/heads/trunk
Commit: fa4ffb8767da18b2b84ef3d3cbeb2d4c5b804dbe
Parents: e32131e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Dec 21 10:53:51 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Dec 21 10:53:51 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/NetworkClient.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa4ffb87/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 232a3cb..8c1c543 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -77,9 +77,6 @@ public class NetworkClient implements KafkaClient {
/* the client id used to identify this client in requests to the server */
private final String clientId;
- /* a random offset to use when choosing nodes to avoid having all nodes choose the same node */
- private final int nodeIndexOffset;
-
/* the current correlation id to use when sending requests to servers */
private int correlation;
@@ -144,7 +141,6 @@ public class NetworkClient implements KafkaClient {
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.randOffset = new Random();
- this.nodeIndexOffset = this.randOffset.nextInt(Integer.MAX_VALUE);
this.requestTimeoutMs = requestTimeoutMs;
this.nodesEverSeen = new ArrayList<>();
this.nodesEverSeenById = new HashMap<>();
@@ -363,8 +359,10 @@ public class NetworkClient implements KafkaClient {
List<Node> nodes = this.metadataUpdater.fetchNodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
+
+ int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
- int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
+ int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
@@ -379,9 +377,9 @@ public class NetworkClient implements KafkaClient {
// if we found no node in the current list, try one from the nodes seen before
if (found == null && nodesEverSeen.size() > 0) {
- int offset = randOffset.nextInt(nodesEverSeen.size());
+ offset = randOffset.nextInt(nodesEverSeen.size());
for (int i = 0; i < nodesEverSeen.size(); i++) {
- int idx = Utils.abs((offset + i) % nodesEverSeen.size());
+ int idx = (offset + i) % nodesEverSeen.size();
Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
log.debug("No node found. Trying previously-seen node with ID {}", node.id());
if (!this.connectionStates.isBlackedOut(node.idString(), now)) {