You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/05 06:29:06 UTC

kafka git commit: KAFKA-2998: log warnings when client is disconnected from bootstrap brokers

Repository: kafka
Updated Branches:
  refs/heads/trunk c3c9289c1 -> 4ff667711


KAFKA-2998: log warnings when client is disconnected from bootstrap brokers

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Grant Henke, Guozhang Wang

Closes #769 from hachikuji/KAFKA-2998


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ff66771
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ff66771
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ff66771

Branch: refs/heads/trunk
Commit: 4ff667711ffd477872adbd3e3e39bd94eac6e763
Parents: c3c9289
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Apr 4 21:28:59 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 4 21:28:59 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java    |  8 ++++++++
 .../main/java/org/apache/kafka/common/Cluster.java | 17 +++++++++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4ff66771/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d22b508..d2eaace 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -556,6 +556,14 @@ public class NetworkClient implements KafkaClient {
             ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
 
             if (requestKey == ApiKeys.METADATA) {
+                Cluster cluster = metadata.fetch();
+                if (cluster.isBootstrapConfigured()) {
+                    int nodeId = Integer.parseInt(request.request().destination());
+                    Node node = cluster.nodeById(nodeId);
+                    if (node != null)
+                        log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
+                }
+
                 metadataFetchInProgress = false;
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ff66771/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8e85df8..e1bf581 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -29,6 +29,7 @@ import java.util.Set;
  */
 public final class Cluster {
 
+    private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -45,11 +46,19 @@ public final class Cluster {
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
+        this(false, nodes, partitions, unauthorizedTopics);
+    }
+
+    private Cluster(boolean isBootstrapConfigured,
+                    Collection<Node> nodes,
+                    Collection<PartitionInfo> partitions,
+                    Set<String> unauthorizedTopics) {
+        this.isBootstrapConfigured = isBootstrapConfigured;
+
         // make a randomized, unmodifiable copy of the nodes
         List<Node> copy = new ArrayList<>(nodes);
         Collections.shuffle(copy);
         this.nodes = Collections.unmodifiableList(copy);
-        
         this.nodesById = new HashMap<>();
         for (Node node : nodes)
             this.nodesById.put(node.id(), node);
@@ -115,7 +124,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
     }
 
     /**
@@ -214,6 +223,10 @@ public final class Cluster {
         return unauthorizedTopics;
     }
 
+    public boolean isBootstrapConfigured() {
+        return isBootstrapConfigured;
+    }
+
     @Override
     public String toString() {
         return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";