You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/24 23:48:57 UTC
kafka git commit: KAFKA-2042;
Update topic list of the metadata regardless of cluster information;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk c62cff355 -> 991195416
KAFKA-2042; Update topic list of the metadata regardless of cluster information; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99119541
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99119541
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99119541
Branch: refs/heads/trunk
Commit: 991195416e0c179d2d2a79891d0214244c287618
Parents: c62cff3
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Mar 24 15:48:46 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 24 15:48:46 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/kafka/clients/Metadata.java | 9 +++++++++
.../org/apache/kafka/clients/producer/KafkaProducer.java | 9 ++++++---
2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/99119541/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index c8bde8b..07f1cdb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -132,6 +132,15 @@ public final class Metadata {
}
/**
+ * Check if a topic is already in the topic set.
+ * @param topic topic to check
+ * @return true if the topic exists, false otherwise
+ */
+ public synchronized boolean containsTopic(String topic) {
+ return this.topics.contains(topic);
+ }
+
+ /**
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/99119541/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index feda9c9..ab26342 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -382,8 +382,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.sender.wakeup();
}
return result.future;
- // Handling exceptions and record the errors;
- // For API exceptions return them in the future,
+ // handling exceptions and record the errors;
+ // for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
@@ -406,6 +406,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param maxWaitMs The maximum time in ms for waiting on the metadata
*/
private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
+ // add topic to metadata topic list if it is not there already.
+ if (!this.metadata.containsTopic(topic))
+ this.metadata.add(topic);
+
if (metadata.fetch().partitionsForTopic(topic) != null) {
return;
} else {
@@ -414,7 +418,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
- metadata.add(topic);
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;