You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:06 UTC

[09/37] git commit: KAFKA-1515 Fix a bug that could result in blocking for a long period of time in the producer. Patch from Guozhang Wang.

KAFKA-1515 Fix a bug that could result in blocking for a long period of time in the producer. Patch from Guozhang Wang.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd3ce27d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd3ce27d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd3ce27d

Branch: refs/heads/transactional_messaging
Commit: cd3ce27d4baf5434676ec040d64663ad3ce09817
Parents: 6de56b3
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Jul 8 13:16:56 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Jul 8 13:16:56 2014 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  4 +-
 .../kafka/clients/producer/KafkaProducer.java   | 40 ++++++++++--
 .../clients/producer/internals/Metadata.java    | 68 +++++++++-----------
 .../clients/producer/internals/Sender.java      |  4 +-
 .../kafka/clients/producer/MetadataTest.java    |  9 ++-
 5 files changed, 77 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d21f922..f739279 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -309,7 +309,7 @@ public class NetworkClient implements KafkaClient {
         }
         // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
         if (this.selector.disconnected().size() > 0)
-            this.metadata.forceUpdate();
+            this.metadata.requestUpdate();
     }
 
     /**
@@ -375,7 +375,7 @@ public class NetworkClient implements KafkaClient {
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(node.id());
             /* maybe the problem is our metadata, update it */
-            metadata.forceUpdate();
+            metadata.requestUpdate();
             log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d85ca30..f58b850 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -73,6 +74,7 @@ public class KafkaProducer implements Producer {
     private final Thread ioThread;
     private final CompressionType compressionType;
     private final Sensor errors;
+    private final Time time;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -94,7 +96,7 @@ public class KafkaProducer implements Producer {
 
     private KafkaProducer(ProducerConfig config) {
         log.trace("Starting the Kafka producer");
-        Time time = new SystemTime();
+        this.time = new SystemTime();
         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                                                       .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                                                                   TimeUnit.MILLISECONDS);
@@ -119,7 +121,7 @@ public class KafkaProducer implements Producer {
                                                  metrics,
                                                  time);
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), 0);
+        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
 
         NetworkClient client = new NetworkClient(new Selector(this.metrics, time),
                                                  this.metadata,
@@ -225,8 +227,9 @@ public class KafkaProducer implements Producer {
     @Override
     public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
         try {
-            Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
-            int partition = partitioner.partition(record, cluster);
+            // first make sure the metadata for the topic is available
+            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
+            int partition = partitioner.partition(record, metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
@@ -256,6 +259,31 @@ public class KafkaProducer implements Producer {
     }
 
     /**
+     * Wait for cluster metadata including partitions for the given topic to be available.
+     * @param topic The topic we want metadata for
+     * @param maxWaitMs The maximum time in ms for waiting on the metadata
+     */
+    private void waitOnMetadata(String topic, long maxWaitMs) {
+        if (metadata.fetch().partitionsForTopic(topic) != null) {
+            return;
+        } else {
+            long begin = time.milliseconds();
+            long remainingWaitMs = maxWaitMs;
+            while (metadata.fetch().partitionsForTopic(topic) == null) {
+                log.trace("Requesting metadata update for topic {}.", topic);
+                int version = metadata.requestUpdate();
+                metadata.add(topic);
+                sender.wakeup();
+                metadata.awaitUpdate(version, remainingWaitMs);
+                long elapsed = time.milliseconds() - begin;
+                if (elapsed >= maxWaitMs)
+                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+                remainingWaitMs = maxWaitMs - elapsed;
+            }
+        }
+    }
+
+    /**
      * Validate that the record size isn't too large
      */
     private void ensureValidRecordSize(int size) {
@@ -271,8 +299,10 @@ public class KafkaProducer implements Producer {
                                               " configuration.");
     }
 
+    @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic);
+        waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+        return this.metadata.fetch().partitionsForTopic(topic);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 8890aa2..140237f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -13,11 +13,9 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,9 +34,10 @@ public final class Metadata {
 
     private final long refreshBackoffMs;
     private final long metadataExpireMs;
+    private int version;
     private long lastRefreshMs;
     private Cluster cluster;
-    private boolean forceUpdate;
+    private boolean needUpdate;
     private final Set<String> topics;
 
     /**
@@ -58,8 +57,9 @@ public final class Metadata {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
         this.lastRefreshMs = 0L;
+        this.version = 0;
         this.cluster = Cluster.empty();
-        this.forceUpdate = false;
+        this.needUpdate = false;
         this.topics = new HashSet<String>();
     }
 
@@ -71,33 +71,10 @@ public final class Metadata {
     }
 
     /**
-     * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
-     * block waiting for an update.
-     * @param topic The topic we want metadata for
-     * @param maxWaitMs The maximum amount of time to block waiting for metadata
+     * Add the topic to maintain in the metadata
      */
-    public synchronized Cluster fetch(String topic, long maxWaitMs) {
-        List<PartitionInfo> partitions = null;
-        long begin = System.currentTimeMillis();
-        long remainingWaitMs = maxWaitMs;
-        do {
-            partitions = cluster.partitionsForTopic(topic);
-            if (partitions == null) {
-                topics.add(topic);
-                forceUpdate = true;
-                try {
-                    log.trace("Requesting metadata update for topic {}.", topic);
-                    wait(remainingWaitMs);
-                } catch (InterruptedException e) { /* this is fine, just try again */
-                }
-                long elapsed = System.currentTimeMillis() - begin;
-                if (elapsed >= maxWaitMs)
-                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
-                remainingWaitMs = maxWaitMs - elapsed;
-            } else {
-                return cluster;
-            }
-        } while (true);
+    public synchronized void add(String topic) {
+        topics.add(topic);
     }
 
     /**
@@ -106,16 +83,35 @@ public final class Metadata {
      * been request then the expiry time is now
      */
     public synchronized long timeToNextUpdate(long nowMs) {
-        long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+        long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
         long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
         return Math.max(timeToExpire, timeToAllowUpdate);
     }
 
     /**
-     * Force an update of the current cluster info
+     * Request an update of the current cluster metadata info, return the current version before the update
      */
-    public synchronized void forceUpdate() {
-        this.forceUpdate = true;
+    public synchronized int requestUpdate() {
+        this.needUpdate = true;
+        return this.version;
+    }
+
+    /**
+     * Wait for metadata update until the current version is larger than the last version we know of
+     */
+    public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) {
+        long begin = System.currentTimeMillis();
+        long remainingWaitMs = maxWaitMs;
+        while (this.version <= lastVerison) {
+            try {
+                wait(remainingWaitMs);
+            } catch (InterruptedException e) { /* this is fine */
+            }
+            long elapsed = System.currentTimeMillis() - begin;
+            if (elapsed >= maxWaitMs)
+                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+            remainingWaitMs = maxWaitMs - elapsed;
+        }
     }
 
     /**
@@ -129,8 +125,9 @@ public final class Metadata {
      * Update the cluster metadata
      */
     public synchronized void update(Cluster cluster, long now) {
-        this.forceUpdate = false;
+        this.needUpdate = false;
         this.lastRefreshMs = now;
+        this.version += 1;
         this.cluster = cluster;
         notifyAll();
         log.debug("Updated cluster metadata to {}", cluster);
@@ -142,5 +139,4 @@ public final class Metadata {
     public synchronized long lastUpdate() {
         return this.lastRefreshMs;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 37b9d1a..a016269 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -147,7 +147,7 @@ public class Sender implements Runnable {
 
         // if there are any partitions whose leaders are not known yet, force metadata update
         if (result.unknownLeadersExist)
-            this.metadata.forceUpdate();
+            this.metadata.requestUpdate();
 
         // remove any nodes we aren't ready to send to
         Iterator<Node> iter = result.readyNodes.iterator();
@@ -252,7 +252,7 @@ public class Sender implements Runnable {
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
         }
         if (error.exception() instanceof InvalidMetadataException)
-            metadata.forceUpdate();
+            metadata.requestUpdate();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3ce27d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 0d7d04c..543304c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -31,7 +31,7 @@ public class MetadataTest {
         long time = 0;
         metadata.update(Cluster.empty(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
-        metadata.forceUpdate();
+        metadata.requestUpdate();
         assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
         time += refreshBackoffMs;
         assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
@@ -40,7 +40,9 @@ public class MetadataTest {
         Thread t2 = asyncFetch(topic);
         assertTrue("Awaiting update", t1.isAlive());
         assertTrue("Awaiting update", t2.isAlive());
-        metadata.update(TestUtils.singletonCluster(topic, 1), time);
+        // keep updating the metadata until no need to
+        while (metadata.timeToNextUpdate(time) == 0)
+            metadata.update(TestUtils.singletonCluster(topic, 1), time);
         t1.join();
         t2.join();
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
@@ -51,7 +53,8 @@ public class MetadataTest {
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {
-                metadata.fetch(topic, Integer.MAX_VALUE);
+                while (metadata.fetch().partitionsForTopic(topic) == null)
+                    metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE);
             }
         };
         thread.start();