You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 18:36:35 UTC

kafka git commit: KAFKA-3068: Remove retry with nodesEverSeen

Repository: kafka
Updated Branches:
  refs/heads/trunk 4adfd7960 -> c4f32c53e


KAFKA-3068: Remove retry with nodesEverSeen

ewencp ijuma if this looks good please merge when you can. Thanks.

Author: Eno Thereska <en...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #823 from enothereska/kafka-3068-alt


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4f32c53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4f32c53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4f32c53

Branch: refs/heads/trunk
Commit: c4f32c53ede93027aa08b1cc6c9f18ec71a702a0
Parents: 4adfd79
Author: Eno Thereska <en...@gmail.com>
Authored: Tue Feb 2 09:36:22 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 2 09:36:22 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 48 +-------------------
 1 file changed, 1 insertion(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4f32c53/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8c1c543..d4c4069 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -33,10 +33,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
@@ -54,14 +52,9 @@ public class NetworkClient implements KafkaClient {
     private final Selectable selector;
     
     private final MetadataUpdater metadataUpdater;
-    
-    /* a list of nodes we've connected to in the past */
-    private final List<Integer> nodesEverSeen;
-    private final Map<Integer, Node> nodesEverSeenById;
 
-    /* random offset into nodesEverSeen list */
     private final Random randOffset;
-    
+
     /* the state of each node's connection */
     private final ClusterConnectionStates connectionStates;
 
@@ -142,9 +135,6 @@ public class NetworkClient implements KafkaClient {
         this.correlation = 0;
         this.randOffset = new Random();
         this.requestTimeoutMs = requestTimeoutMs;
-        this.nodesEverSeen = new ArrayList<>();
-        this.nodesEverSeenById = new HashMap<>();
-        
         this.time = time;
     }
 
@@ -374,19 +364,6 @@ public class NetworkClient implements KafkaClient {
                 found = node;
             }
         }
-
-        // if we found no node in the current list, try one from the nodes seen before
-        if (found == null && nodesEverSeen.size() > 0) {
-            offset = randOffset.nextInt(nodesEverSeen.size());
-            for (int i = 0; i < nodesEverSeen.size(); i++) {
-                int idx = (offset + i) % nodesEverSeen.size();
-                Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
-                log.debug("No node found. Trying previously-seen node with ID {}", node.id());
-                if (!this.connectionStates.isBlackedOut(node.idString(), now)) {
-                    found = node;
-                }
-            }
-        }
         
         return found;
     }
@@ -596,28 +573,6 @@ public class NetworkClient implements KafkaClient {
             this.metadata.requestUpdate();
         }
 
-        /*
-         * Keep track of any nodes we've ever seen. Add current
-         * alive nodes to this tracking list. 
-         * @param nodes Current alive nodes
-         */
-        private void updateNodesEverSeen(List<Node> nodes) {
-            for (Node n : nodes) {
-                Node existing = nodesEverSeenById.get(n.id());
-                if (existing == null) {
-                    nodesEverSeenById.put(n.id(), n);
-                    log.debug("Adding node {} to nodes ever seen", n.id());
-                    nodesEverSeen.add(n.id());
-                } else {
-                    // check if the nodes are really equal. There could be a case
-                    // where node.id() is the same but node has moved to different host
-                    if (!existing.equals(n)) {
-                        nodesEverSeenById.put(n.id(), n);
-                    }
-                }
-            }
-        }
-
         private void handleResponse(RequestHeader header, Struct body, long now) {
             this.metadataFetchInProgress = false;
             MetadataResponse response = new MetadataResponse(body);
@@ -630,7 +585,6 @@ public class NetworkClient implements KafkaClient {
             // created which means we will get errors and no nodes until it exists
             if (cluster.nodes().size() > 0) {
                 this.metadata.update(cluster, now);
-                this.updateNodesEverSeen(cluster.nodes());
             } else {
                 log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                 this.metadata.failedUpdate(now);