You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/10/24 18:26:49 UTC

git commit: KAFKA-1719 Make mirror maker exit when one consumer/producer thread exits; reviewed by Neha Narkhede, Joel Koshy and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 12ce4b1e1 -> 43cb192b5


KAFKA-1719 Make mirror maker exit when one consumer/producer thread exits; reviewed by Neha Narkhede, Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43cb192b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43cb192b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43cb192b

Branch: refs/heads/trunk
Commit: 43cb192b59ab676d35c38f9144fbc98a954a11d3
Parents: 12ce4b1
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Oct 24 09:26:39 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Oct 24 09:26:41 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 74 +++++++++++++-------
 1 file changed, 48 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/43cb192b/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b8698ee..f399105 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,26 +17,27 @@
 
 package kafka.tools
 
-import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging}
 import kafka.consumer._
-import kafka.serializer._
-import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer}
 import kafka.metrics.KafkaMetricsGroup
-
+import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer}
+import kafka.serializer._
+import kafka.utils._
 import org.apache.kafka.clients.producer.ProducerRecord
 
+import java.util.Random
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit}
+
 import scala.collection.JavaConversions._
 
 import joptsimple.OptionParser
-import java.util.Random
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
 
 object MirrorMaker extends Logging {
 
   private var connectors: Seq[ZookeeperConsumerConnector] = null
   private var consumerThreads: Seq[ConsumerThread] = null
   private var producerThreads: Seq[ProducerThread] = null
+  private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
 
   private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
 
@@ -89,10 +90,10 @@ object MirrorMaker extends Logging {
       .ofType(classOf[String])
 
     val blacklistOpt = parser.accepts("blacklist",
-            "Blacklist of topics to mirror.")
-            .withRequiredArg()
-            .describedAs("Java regex (String)")
-            .ofType(classOf[String])
+      "Blacklist of topics to mirror.")
+      .withRequiredArg()
+      .describedAs("Java regex (String)")
+      .ofType(classOf[String])
 
     val helpOpt = parser.accepts("help", "Print this message.")
     
@@ -173,19 +174,21 @@ object MirrorMaker extends Logging {
   }
 
   def cleanShutdown() {
-    if (connectors != null) connectors.foreach(_.shutdown)
-    if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
-    if (producerThreads != null) {
-      producerThreads.foreach(_.shutdown)
-      producerThreads.foreach(_.awaitShutdown)
+    if (isShuttingdown.compareAndSet(false, true)) {
+      if (connectors != null) connectors.foreach(_.shutdown)
+      if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
+      if (producerThreads != null) {
+        producerThreads.foreach(_.shutdown)
+        producerThreads.foreach(_.awaitShutdown)
+      }
+      info("Kafka mirror maker shutdown successfully")
     }
-    info("Kafka mirror maker shutdown successfully")
   }
 
-  class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup {
+  class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup {
 
-    val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers)
-    for (i <- 0 until numConsumers)
+    val queues = new Array[BlockingQueue[ProducerRecord]](numOutputs)
+    for (i <- 0 until numOutputs)
       queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity)
 
     private val counter = new AtomicInteger(new Random().nextInt())
@@ -203,17 +206,21 @@ object MirrorMaker extends Logging {
       // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue
       val queueId =
         if(record.key() != null) {
-          Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers
+          Utils.abs(java.util.Arrays.hashCode(record.key())) % numOutputs
         } else {
-          Utils.abs(counter.getAndIncrement()) % numConsumers
+          Utils.abs(counter.getAndIncrement()) % numOutputs
         }
+      put(record, queueId)
+    }
+
+    def put(record: ProducerRecord, queueId: Int) {
       val queue = queues(queueId)
 
       var putSucceed = false
       while (!putSucceed) {
         val startPutTime = SystemTime.nanoseconds
         putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
-        waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers)
+        waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs)
       }
       channelSizeHist.update(queue.size)
     }
@@ -224,7 +231,7 @@ object MirrorMaker extends Logging {
       while (data == null) {
         val startTakeTime = SystemTime.nanoseconds
         data = queue.poll(500, TimeUnit.MILLISECONDS)
-        waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers)
+        waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs)
       }
       channelSizeHist.update(queue.size)
       data
@@ -238,6 +245,7 @@ object MirrorMaker extends Logging {
 
     private val shutdownLatch = new CountDownLatch(1)
     private val threadName = "mirrormaker-consumer-" + threadId
+    private var isCleanShutdown: Boolean = true
     this.logIdent = "[%s] ".format(threadName)
 
     this.setName(threadName)
@@ -250,11 +258,18 @@ object MirrorMaker extends Logging {
           mirrorDataChannel.put(data)
         }
       } catch {
-        case e: Throwable =>
+        case e: Throwable => {
           fatal("Stream unexpectedly exited.", e)
+          isCleanShutdown = false
+        }
       } finally {
         shutdownLatch.countDown()
         info("Consumer thread stopped")
+        // If it exits accidentally, stop the entire mirror maker.
+        if (!isCleanShutdown) {
+          fatal("Consumer thread exited abnormally, stopping the whole mirror maker.")
+          System.exit(-1)
+        }
       }
     }
 
@@ -273,6 +288,7 @@ object MirrorMaker extends Logging {
                         val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
     private val threadName = "mirrormaker-producer-" + threadId
     private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+    private var isCleanShutdown: Boolean = true
     this.logIdent = "[%s] ".format(threadName)
 
     setName(threadName)
@@ -292,17 +308,23 @@ object MirrorMaker extends Logging {
       } catch {
         case t: Throwable => {
           fatal("Producer thread failure due to ", t)
+          isCleanShutdown = false
         }
       } finally {
         shutdownComplete.countDown
         info("Producer thread stopped")
+        // If it exits accidentally, stop the entire mirror maker.
+        if (!isCleanShutdown) {
+          fatal("Producer thread exited abnormally, stopping the whole mirror maker.")
+          System.exit(-1)
+        }
       }
     }
 
     def shutdown {
       try {
         info("Producer thread " + threadName + " shutting down")
-        dataChannel.put(shutdownMessage)
+        dataChannel.put(shutdownMessage, threadId)
       }
       catch {
         case ie: InterruptedException => {