You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/24 23:48:57 UTC

kafka git commit: KAFKA-2042; Update topic list of the metadata regardless of cluster information; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk c62cff355 -> 991195416


KAFKA-2042; Update topic list of the metadata regardless of cluster information; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99119541
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99119541
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99119541

Branch: refs/heads/trunk
Commit: 991195416e0c179d2d2a79891d0214244c287618
Parents: c62cff3
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Mar 24 15:48:46 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 24 15:48:46 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/kafka/clients/Metadata.java    | 9 +++++++++
 .../org/apache/kafka/clients/producer/KafkaProducer.java    | 9 ++++++---
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/99119541/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index c8bde8b..07f1cdb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -132,6 +132,15 @@ public final class Metadata {
     }
 
     /**
+     * Check if a topic is already in the topic set.
+     * @param topic topic to check
+     * @return true if the topic exists, false otherwise
+     */
+    public synchronized boolean containsTopic(String topic) {
+        return this.topics.contains(topic);
+    }
+
+    /**
      * Update the cluster metadata
      */
     public synchronized void update(Cluster cluster, long now) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/99119541/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index feda9c9..ab26342 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -382,8 +382,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 this.sender.wakeup();
             }
             return result.future;
-            // Handling exceptions and record the errors;
-            // For API exceptions return them in the future,
+            // handling exceptions and record the errors;
+            // for API exceptions return them in the future,
             // for other exceptions throw directly
         } catch (ApiException e) {
             log.debug("Exception occurred during message send:", e);
@@ -406,6 +406,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
      */
     private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
+        // add topic to metadata topic list if it is not there already.
+        if (!this.metadata.containsTopic(topic))
+            this.metadata.add(topic);
+
         if (metadata.fetch().partitionsForTopic(topic) != null) {
             return;
         } else {
@@ -414,7 +418,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             while (metadata.fetch().partitionsForTopic(topic) == null) {
                 log.trace("Requesting metadata update for topic {}.", topic);
                 int version = metadata.requestUpdate();
-                metadata.add(topic);
                 sender.wakeup();
                 metadata.awaitUpdate(version, remainingWaitMs);
                 long elapsed = time.milliseconds() - begin;