You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/07/11 22:09:52 UTC

git commit: KAFKA-1515 Producer can hang during metadata updates. Patch by Guozhang.

Repository: kafka
Updated Branches:
  refs/heads/trunk 8034390ef -> 83a9aa55d


KAFKA-1515 Producer can hang during metadata updates. Patch by Guozhang.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83a9aa55
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83a9aa55
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83a9aa55

Branch: refs/heads/trunk
Commit: 83a9aa55d340f6b2720394a49f01a88509e17e52
Parents: 8034390
Author: Jay Kreps <ja...@gmail.com>
Authored: Fri Jul 11 13:08:24 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Fri Jul 11 13:08:24 2014 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 33 +++++++++++++-------
 .../clients/producer/internals/Metadata.java    |  2 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 31 +++++++++---------
 3 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/83a9aa55/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index f739279..d8f9ce6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -122,16 +122,21 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public boolean isReady(Node node, long now) {
-        return isReady(node.id(), now);
-    }
-
-    private boolean isReady(int node, long now) {
+        int nodeId = node.id();
         if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
             // if we need to update our metadata now declare all requests unready to make metadata requests first priority
             return false;
         else
             // otherwise we are ready if we are connected and can send more requests
-            return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
+            return isSendable(nodeId);
+    }
+
+    /**
+     * Are we connected and ready and able to send more requests to the given node?
+     * @param node The node
+     */
+    private boolean isSendable(int node) {
+        return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
     }
 
     /**
@@ -146,21 +151,21 @@ public class NetworkClient implements KafkaClient {
     public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
         List<NetworkSend> sends = new ArrayList<NetworkSend>();
 
-        // should we update our metadata?
-        long metadataTimeout = metadata.timeToNextUpdate(now);
-        if (!this.metadataFetchInProgress && metadataTimeout == 0)
-            maybeUpdateMetadata(sends, now);
-
         for (int i = 0; i < requests.size(); i++) {
             ClientRequest request = requests.get(i);
             int nodeId = request.request().destination();
-            if (!isReady(nodeId, now))
+            if (!isSendable(nodeId))
                 throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
 
             this.inFlightRequests.add(request);
             sends.add(request.request());
         }
 
+        // should we update our metadata?
+        long metadataTimeout = metadata.timeToNextUpdate(now);
+        if (!this.metadataFetchInProgress && metadataTimeout == 0)
+            maybeUpdateMetadata(sends, now);
+
         // do the I/O
         try {
             this.selector.poll(Math.min(timeout, metadataTimeout), sends);
@@ -347,9 +352,12 @@ public class NetworkClient implements KafkaClient {
      */
     private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
         Node node = this.leastLoadedNode(now);
-        if (node == null)
+        if (node == null) {
+            log.debug("Give up sending metadata request since no node is available");
             return;
+        }
 
+        log.debug("Trying to send metadata request to node {}", node.id());
         if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
             Set<String> topics = metadata.topics();
             this.metadataFetchInProgress = true;
@@ -359,6 +367,7 @@ public class NetworkClient implements KafkaClient {
             this.inFlightRequests.add(metadataRequest);
         } else if (connectionStates.canConnect(node.id(), now)) {
             // we don't have a connection to this node right now, make one
+            log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id());
             initiateConnect(node, now);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/83a9aa55/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 140237f..4aa5b01 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -130,7 +130,7 @@ public final class Metadata {
         this.version += 1;
         this.cluster = cluster;
         notifyAll();
-        log.debug("Updated cluster metadata to {}", cluster);
+        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/83a9aa55/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 4f06e34..555d751 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -116,19 +116,6 @@ object MirrorMaker extends Logging {
     val numStreams = options.valueOf(numStreamsOpt).intValue()
     val bufferSize = options.valueOf(bufferSizeOpt).intValue()
 
-    val useNewProducer = options.has(useNewProducerOpt)
-    val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
-
-    // create producer threads
-    val clientId = producerProps.getProperty("client.id", "")
-    val producers = (1 to numProducers).map(i => {
-      producerProps.setProperty("client.id", clientId + "-" + i)
-      if (useNewProducer)
-        new NewShinyProducer(producerProps)
-      else
-        new OldProducer(producerProps)
-    })
-
     // create consumer streams
     connectors = options.valuesOf(consumerConfigOpt).toList
       .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
@@ -138,8 +125,21 @@ object MirrorMaker extends Logging {
     // create a data channel btw the consumers and the producers
     val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
 
-    producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2))
+    // create producer threads
+    val useNewProducer = options.has(useNewProducerOpt)
+    val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+    val clientId = producerProps.getProperty("client.id", "")
+    producerThreads = (0 until numProducers).map(i => {
+      producerProps.setProperty("client.id", clientId + "-" + i)
+      val producer =
+      if (useNewProducer)
+        new NewShinyProducer(producerProps)
+      else
+        new OldProducer(producerProps)
+      new ProducerThread(mirrorDataChannel, producer, i)
+    })
 
+    // create consumer threads
     val filterSpec = if (options.has(whitelistOpt))
       new Whitelist(options.valueOf(whitelistOpt))
     else
@@ -153,7 +153,7 @@ object MirrorMaker extends Logging {
         fatal("Unable to create stream - shutting down mirror maker.")
         connectors.foreach(_.shutdown)
     }
-    consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2))
+    consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2))
     assert(consumerThreads.size == numConsumers)
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
@@ -233,7 +233,6 @@ object MirrorMaker extends Logging {
 
   class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
                        mirrorDataChannel: DataChannel,
-                       producers: Seq[BaseProducer],
                        threadId: Int)
           extends Thread with Logging with KafkaMetricsGroup {