You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/22 00:58:11 UTC
kafka git commit: KAFKA-1919 Ensure we backoff if metadata response
is empty. Double committed on trunk.
Repository: kafka
Updated Branches:
refs/heads/0.8.2 cd8f0b87b -> bafecc936
KAFKA-1919 Ensure we backoff if metadata response is empty. Double committed on trunk.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bafecc93
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bafecc93
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bafecc93
Branch: refs/heads/0.8.2
Commit: bafecc936787576faf111f12cd59cc17c3523e4e
Parents: cd8f0b8
Author: Jay Kreps <ja...@gmail.com>
Authored: Sat Feb 21 15:45:58 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Sat Feb 21 15:45:58 2015 -0800
----------------------------------------------------------------------
.../main/java/org/apache/kafka/clients/NetworkClient.java | 6 ++++--
.../apache/kafka/clients/producer/internals/Metadata.java | 8 ++++++++
2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bafecc93/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 6746275..07527d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -309,10 +309,12 @@ public class NetworkClient implements KafkaClient {
Cluster cluster = response.cluster();
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
- if (cluster.nodes().size() > 0)
+ if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
- else
+ } else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+ this.metadata.failedUpdate(now);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/bafecc93/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 1d30f9e..9c319a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -113,6 +113,14 @@ public final class Metadata {
remainingWaitMs = maxWaitMs - elapsed;
}
}
+
+ /**
+ * Record an attempt to update the metadata that failed. We need to keep track of this
+ * to avoid retrying immediately.
+ */
+ public synchronized void failedUpdate(long now) {
+ this.lastRefreshMs = now;
+ }
/**
* Get the list of topics we are currently maintaining metadata for