You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2013/05/23 01:26:26 UTC

git commit: KAFKA-914; Break deadlock between initial rebalance and watcher-triggered rebalances; reviewed by Jun Rao and Neha Narkhede

Updated Branches:
  refs/heads/0.8 32cd8994b -> ffd84eb23


KAFKA-914; Break deadlock between initial rebalance and watcher-triggered rebalances; reviewed by Jun Rao and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffd84eb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffd84eb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffd84eb2

Branch: refs/heads/0.8
Commit: ffd84eb23fcbb279a63ba5d4cb72077a0c079cff
Parents: 32cd899
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed May 22 16:26:04 2013 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed May 22 16:26:04 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala    |   28 ++++++++++++-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |   32 ++++++++++----
 2 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 658b5c1..db104f1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -85,13 +85,32 @@ class ConsumerFetcherManager(private val consumerIdString: String,
                   addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
                   noLeaderPartitionSet -= topicAndPartition
               } catch {
-                case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
+                case t => {
+                  /*
+                   * If we are shutting down (e.g., due to a rebalance) propagate this exception upward to avoid
+                   * processing subsequent partitions without leader so the leader-finder-thread can exit.
+                   * It is unfortunate that we depend on the following behavior and we should redesign this: as part of
+                   * processing partitions, we catch the InterruptedException (thrown from addPartition's call to
+                   * lockInterruptibly) when adding partitions, thereby clearing the interrupted flag. If we process
+                   * more partitions, then the lockInterruptibly in addPartition will not throw an InterruptedException
+                   * and we can run into the deadlock described in KAFKA-914.
+                   */
+                  if (!isRunning.get())
+                    throw t
+                  else
+                    warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
+                }
               }
           }
 
           shutdownIdleFetcherThreads()
         } catch {
-          case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
+          case t => {
+            if (!isRunning.get())
+              throw t /* See above for why we need to propagate this exception. */
+            else
+              warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
+          }
         }
       } finally {
         lock.unlock()
@@ -122,6 +141,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   }
 
   def stopConnections() {
+    /*
+     * Stop the leader finder thread first before stopping fetchers. Otherwise, if there are more partitions without
+     * leader, then the leader finder thread will process these partitions (before shutting down) and add fetchers for
+     * these partitions.
+     */
     info("Stopping leader finder thread")
     if (leaderFinderThread != null) {
       leaderFinderThread.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 3d22dc7..2d93947 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -28,7 +28,6 @@ import collection.mutable.ListBuffer
 import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
 import kafka.javaapi
 
-
 object MirrorMaker extends Logging {
 
   def main(args: Array[String]) {
@@ -114,23 +113,33 @@ object MirrorMaker extends Logging {
     else
       new Blacklist(options.valueOf(blacklistOpt))
 
-    val streams =
-      connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
+    var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
+    try {
+      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
+    } catch {
+      case t =>
+        fatal("Unable to create stream - shutting down mirror maker.")
+        connectors.foreach(_.shutdown)
+    }
 
     val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
     val consumerThreads =
-      streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
 
     val producerThreads = new ListBuffer[ProducerThread]()
 
+    def cleanShutdown() {
+      connectors.foreach(_.shutdown)
+      consumerThreads.foreach(_.awaitShutdown)
+      producerThreads.foreach(_.shutdown)
+      producerThreads.foreach(_.awaitShutdown)
+      info("Kafka mirror maker shutdown successfully")
+    }
+
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
-        connectors.foreach(_.shutdown)
-        consumerThreads.foreach(_.awaitShutdown)
-        producerThreads.foreach(_.shutdown)
-        producerThreads.foreach(_.awaitShutdown)
-        logger.info("Kafka migration tool shutdown successfully");
+        cleanShutdown()
       }
     })
 
@@ -145,6 +154,10 @@ object MirrorMaker extends Logging {
 
     consumerThreads.foreach(_.start)
     producerThreads.foreach(_.start)
+
+    // in case the consumer threads hit a timeout/other exception
+    consumerThreads.foreach(_.awaitShutdown)
+    cleanShutdown()
   }
 
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
@@ -158,6 +171,7 @@ object MirrorMaker extends Logging {
     this.setName(threadName)
 
     override def run() {
+      info("Starting mirror maker thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
           val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)