You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/22 00:58:11 UTC

kafka git commit: KAFKA-1919 Ensure we backoff if metadata response is empty. Double committed on trunk.

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 cd8f0b87b -> bafecc936


KAFKA-1919 Ensure we backoff if metadata response is empty. Double committed on trunk.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bafecc93
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bafecc93
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bafecc93

Branch: refs/heads/0.8.2
Commit: bafecc936787576faf111f12cd59cc17c3523e4e
Parents: cd8f0b8
Author: Jay Kreps <ja...@gmail.com>
Authored: Sat Feb 21 15:45:58 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Sat Feb 21 15:45:58 2015 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/clients/NetworkClient.java    | 6 ++++--
 .../apache/kafka/clients/producer/internals/Metadata.java    | 8 ++++++++
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bafecc93/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 6746275..07527d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -309,10 +309,12 @@ public class NetworkClient implements KafkaClient {
         Cluster cluster = response.cluster();
         // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
         // created which means we will get errors and no nodes until it exists
-        if (cluster.nodes().size() > 0)
+        if (cluster.nodes().size() > 0) {
             this.metadata.update(cluster, now);
-        else
+        } else {
             log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+            this.metadata.failedUpdate(now);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/bafecc93/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 1d30f9e..9c319a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -113,6 +113,14 @@ public final class Metadata {
             remainingWaitMs = maxWaitMs - elapsed;
         }
     }
+    
+    /**
+     * Record an attempt to update the metadata that failed. We need to keep track of this
+     * to avoid retrying immediately.
+     */
+    public synchronized void failedUpdate(long now) {
+        this.lastRefreshMs = now;
+    }
 
     /**
      * Get the list of topics we are currently maintaining metadata for