You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 18:36:35 UTC
kafka git commit: KAFKA-3068: Remove retry with nodesEverSeen
Repository: kafka
Updated Branches:
refs/heads/trunk 4adfd7960 -> c4f32c53e
KAFKA-3068: Remove retry with nodesEverSeen
ewencp ijuma if this looks good please merge when you can. Thanks.
Author: Eno Thereska <en...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #823 from enothereska/kafka-3068-alt
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4f32c53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4f32c53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4f32c53
Branch: refs/heads/trunk
Commit: c4f32c53ede93027aa08b1cc6c9f18ec71a702a0
Parents: 4adfd79
Author: Eno Thereska <en...@gmail.com>
Authored: Tue Feb 2 09:36:22 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 2 09:36:22 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 48 +-------------------
1 file changed, 1 insertion(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4f32c53/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8c1c543..d4c4069 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -33,10 +33,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -54,14 +52,9 @@ public class NetworkClient implements KafkaClient {
private final Selectable selector;
private final MetadataUpdater metadataUpdater;
-
- /* a list of nodes we've connected to in the past */
- private final List<Integer> nodesEverSeen;
- private final Map<Integer, Node> nodesEverSeenById;
- /* random offset into nodesEverSeen list */
private final Random randOffset;
-
+
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
@@ -142,9 +135,6 @@ public class NetworkClient implements KafkaClient {
this.correlation = 0;
this.randOffset = new Random();
this.requestTimeoutMs = requestTimeoutMs;
- this.nodesEverSeen = new ArrayList<>();
- this.nodesEverSeenById = new HashMap<>();
-
this.time = time;
}
@@ -374,19 +364,6 @@ public class NetworkClient implements KafkaClient {
found = node;
}
}
-
- // if we found no node in the current list, try one from the nodes seen before
- if (found == null && nodesEverSeen.size() > 0) {
- offset = randOffset.nextInt(nodesEverSeen.size());
- for (int i = 0; i < nodesEverSeen.size(); i++) {
- int idx = (offset + i) % nodesEverSeen.size();
- Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
- log.debug("No node found. Trying previously-seen node with ID {}", node.id());
- if (!this.connectionStates.isBlackedOut(node.idString(), now)) {
- found = node;
- }
- }
- }
return found;
}
@@ -596,28 +573,6 @@ public class NetworkClient implements KafkaClient {
this.metadata.requestUpdate();
}
- /*
- * Keep track of any nodes we've ever seen. Add current
- * alive nodes to this tracking list.
- * @param nodes Current alive nodes
- */
- private void updateNodesEverSeen(List<Node> nodes) {
- for (Node n : nodes) {
- Node existing = nodesEverSeenById.get(n.id());
- if (existing == null) {
- nodesEverSeenById.put(n.id(), n);
- log.debug("Adding node {} to nodes ever seen", n.id());
- nodesEverSeen.add(n.id());
- } else {
- // check if the nodes are really equal. There could be a case
- // where node.id() is the same but node has moved to different host
- if (!existing.equals(n)) {
- nodesEverSeenById.put(n.id(), n);
- }
- }
- }
- }
-
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
@@ -630,7 +585,6 @@ public class NetworkClient implements KafkaClient {
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
- this.updateNodesEverSeen(cluster.nodes());
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);