You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/10/24 18:26:49 UTC
git commit: KAFKA-1719 Make mirror maker exit when one
consumer/producer thread exits;
reviewed by Neha Narkhede, Joel Koshy and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 12ce4b1e1 -> 43cb192b5
KAFKA-1719 Make mirror maker exit when one consumer/producer thread exits; reviewed by Neha Narkhede, Joel Koshy and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43cb192b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43cb192b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43cb192b
Branch: refs/heads/trunk
Commit: 43cb192b59ab676d35c38f9144fbc98a954a11d3
Parents: 12ce4b1
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Oct 24 09:26:39 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Oct 24 09:26:41 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/tools/MirrorMaker.scala | 74 +++++++++++++-------
1 file changed, 48 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/43cb192b/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b8698ee..f399105 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,26 +17,27 @@
package kafka.tools
-import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging}
import kafka.consumer._
-import kafka.serializer._
-import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer}
import kafka.metrics.KafkaMetricsGroup
-
+import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer}
+import kafka.serializer._
+import kafka.utils._
import org.apache.kafka.clients.producer.ProducerRecord
+import java.util.Random
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit}
+
import scala.collection.JavaConversions._
import joptsimple.OptionParser
-import java.util.Random
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
object MirrorMaker extends Logging {
private var connectors: Seq[ZookeeperConsumerConnector] = null
private var consumerThreads: Seq[ConsumerThread] = null
private var producerThreads: Seq[ProducerThread] = null
+ private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
@@ -89,10 +90,10 @@ object MirrorMaker extends Logging {
.ofType(classOf[String])
val blacklistOpt = parser.accepts("blacklist",
- "Blacklist of topics to mirror.")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
+ "Blacklist of topics to mirror.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
val helpOpt = parser.accepts("help", "Print this message.")
@@ -173,19 +174,21 @@ object MirrorMaker extends Logging {
}
def cleanShutdown() {
- if (connectors != null) connectors.foreach(_.shutdown)
- if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
- if (producerThreads != null) {
- producerThreads.foreach(_.shutdown)
- producerThreads.foreach(_.awaitShutdown)
+ if (isShuttingdown.compareAndSet(false, true)) {
+ if (connectors != null) connectors.foreach(_.shutdown)
+ if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
+ if (producerThreads != null) {
+ producerThreads.foreach(_.shutdown)
+ producerThreads.foreach(_.awaitShutdown)
+ }
+ info("Kafka mirror maker shutdown successfully")
}
- info("Kafka mirror maker shutdown successfully")
}
- class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup {
+ class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup {
- val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers)
- for (i <- 0 until numConsumers)
+ val queues = new Array[BlockingQueue[ProducerRecord]](numOutputs)
+ for (i <- 0 until numOutputs)
queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity)
private val counter = new AtomicInteger(new Random().nextInt())
@@ -203,17 +206,21 @@ object MirrorMaker extends Logging {
// Otherwise use the queue based on the key value so that same key-ed messages go to the same queue
val queueId =
if(record.key() != null) {
- Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers
+ Utils.abs(java.util.Arrays.hashCode(record.key())) % numOutputs
} else {
- Utils.abs(counter.getAndIncrement()) % numConsumers
+ Utils.abs(counter.getAndIncrement()) % numOutputs
}
+ put(record, queueId)
+ }
+
+ def put(record: ProducerRecord, queueId: Int) {
val queue = queues(queueId)
var putSucceed = false
while (!putSucceed) {
val startPutTime = SystemTime.nanoseconds
putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
- waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers)
+ waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs)
}
channelSizeHist.update(queue.size)
}
@@ -224,7 +231,7 @@ object MirrorMaker extends Logging {
while (data == null) {
val startTakeTime = SystemTime.nanoseconds
data = queue.poll(500, TimeUnit.MILLISECONDS)
- waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers)
+ waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs)
}
channelSizeHist.update(queue.size)
data
@@ -238,6 +245,7 @@ object MirrorMaker extends Logging {
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-consumer-" + threadId
+ private var isCleanShutdown: Boolean = true
this.logIdent = "[%s] ".format(threadName)
this.setName(threadName)
@@ -250,11 +258,18 @@ object MirrorMaker extends Logging {
mirrorDataChannel.put(data)
}
} catch {
- case e: Throwable =>
+ case e: Throwable => {
fatal("Stream unexpectedly exited.", e)
+ isCleanShutdown = false
+ }
} finally {
shutdownLatch.countDown()
info("Consumer thread stopped")
+ // If it exits accidentally, stop the entire mirror maker.
+ if (!isCleanShutdown) {
+ fatal("Consumer thread exited abnormally, stopping the whole mirror maker.")
+ System.exit(-1)
+ }
}
}
@@ -273,6 +288,7 @@ object MirrorMaker extends Logging {
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-producer-" + threadId
private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+ private var isCleanShutdown: Boolean = true
this.logIdent = "[%s] ".format(threadName)
setName(threadName)
@@ -292,17 +308,23 @@ object MirrorMaker extends Logging {
} catch {
case t: Throwable => {
fatal("Producer thread failure due to ", t)
+ isCleanShutdown = false
}
} finally {
shutdownComplete.countDown
info("Producer thread stopped")
+ // If it exits accidentally, stop the entire mirror maker.
+ if (!isCleanShutdown) {
+ fatal("Producer thread exited abnormally, stopping the whole mirror maker.")
+ System.exit(-1)
+ }
}
}
def shutdown {
try {
info("Producer thread " + threadName + " shutting down")
- dataChannel.put(shutdownMessage)
+ dataChannel.put(shutdownMessage, threadId)
}
catch {
case ie: InterruptedException => {