You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:45 UTC
[29/50] [abbrv] kafka git commit: KAFKA-2998: log warnings when
client is disconnected from bootstrap brokers
KAFKA-2998: log warnings when client is disconnected from bootstrap brokers
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Grant Henke, Guozhang Wang
Closes #769 from hachikuji/KAFKA-2998
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ff66771
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ff66771
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ff66771
Branch: refs/heads/0.10.0
Commit: 4ff667711ffd477872adbd3e3e39bd94eac6e763
Parents: c3c9289
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Apr 4 21:28:59 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 4 21:28:59 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 8 ++++++++
.../main/java/org/apache/kafka/common/Cluster.java | 17 +++++++++++++++--
2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ff66771/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d22b508..d2eaace 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -556,6 +556,14 @@ public class NetworkClient implements KafkaClient {
ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
if (requestKey == ApiKeys.METADATA) {
+ Cluster cluster = metadata.fetch();
+ if (cluster.isBootstrapConfigured()) {
+ int nodeId = Integer.parseInt(request.request().destination());
+ Node node = cluster.nodeById(nodeId);
+ if (node != null)
+ log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
+ }
+
metadataFetchInProgress = false;
return true;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ff66771/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8e85df8..e1bf581 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -29,6 +29,7 @@ import java.util.Set;
*/
public final class Cluster {
+ private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -45,11 +46,19 @@ public final class Cluster {
public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) {
+ this(false, nodes, partitions, unauthorizedTopics);
+ }
+
+ private Cluster(boolean isBootstrapConfigured,
+ Collection<Node> nodes,
+ Collection<PartitionInfo> partitions,
+ Set<String> unauthorizedTopics) {
+ this.isBootstrapConfigured = isBootstrapConfigured;
+
// make a randomized, unmodifiable copy of the nodes
List<Node> copy = new ArrayList<>(nodes);
Collections.shuffle(copy);
this.nodes = Collections.unmodifiableList(copy);
-
this.nodesById = new HashMap<>();
for (Node node : nodes)
this.nodesById.put(node.id(), node);
@@ -115,7 +124,7 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
- return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+ return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
}
/**
@@ -214,6 +223,10 @@ public final class Cluster {
return unauthorizedTopics;
}
+ public boolean isBootstrapConfigured() {
+ return isBootstrapConfigured;
+ }
+
@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";