You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/05/16 19:53:00 UTC

git commit: kafka-1453 (follow-up); Add a channel queue jmx in Mirror Maker; patched by Guozhang Wang; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk b4c1089fa -> 345c52ee3


kafka-1453 (follow-up); Add a channel queue jmx in Mirror Maker;  patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/345c52ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/345c52ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/345c52ee

Branch: refs/heads/trunk
Commit: 345c52ee3581847ddfaa4e53173714d8d97896c5
Parents: b4c1089
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri May 16 10:52:58 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri May 16 10:52:58 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 47 +++++++++++---------
 1 file changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/345c52ee/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 19df3d5..52763df 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -116,16 +116,22 @@ object MirrorMaker extends Logging {
     val useNewProducer = options.has(useNewProducerOpt)
     val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
 
-    // create data channel
-    val mirrorDataChannel = new DataChannel(bufferSize)
-
     // create producer threads
     val producers = (1 to numProducers).map(_ => {
-        if (useNewProducer)
-          new NewShinyProducer(producerProps)
-        else
-          new OldProducer(producerProps)
-      })
+      if (useNewProducer)
+        new NewShinyProducer(producerProps)
+      else
+        new OldProducer(producerProps)
+    })
+
+    // create consumer streams
+    connectors = options.valuesOf(consumerConfigOpt).toList
+      .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
+      .map(new ZookeeperConsumerConnector(_))
+    val numConsumers = connectors.size * numStreams
+
+    // create a data channel btw the consumers and the producers
+    val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
 
     producerThreads = new ListBuffer[ProducerThread]()
     var producerIndex: Int = 1
@@ -135,11 +141,6 @@ object MirrorMaker extends Logging {
       producerIndex += 1
     }
 
-    // create consumer streams
-    connectors = options.valuesOf(consumerConfigOpt).toList
-            .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
-            .map(new ZookeeperConsumerConnector(_))
-
     val filterSpec = if (options.has(whitelistOpt))
       new Whitelist(options.valueOf(whitelistOpt))
     else
@@ -154,6 +155,7 @@ object MirrorMaker extends Logging {
         connectors.foreach(_.shutdown)
     }
     consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2))
+    assert(consumerThreads.size == numConsumers)
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -181,7 +183,7 @@ object MirrorMaker extends Logging {
     info("Kafka mirror maker shutdown successfully")
   }
 
-  class DataChannel(capacity: Int) extends KafkaMetricsGroup {
+  class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup {
 
     val queue = new ArrayBlockingQueue[ProducerRecord](capacity)
 
@@ -192,25 +194,28 @@ object MirrorMaker extends Logging {
       }
     )
 
-    private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.MILLISECONDS)
-    private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.MILLISECONDS)
-
+    // We use a single meter for aggregated wait percentage for the data channel.
+    // Since meter is calculated as total_recorded_value / time_window and
+    // time_window is independent of the number of threads, each recorded wait
+    // time should be discounted by # threads.
+    private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
+    private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
 
     def put(record: ProducerRecord) {
       var putSucceed = false
       while (!putSucceed) {
-        val startPutTime = SystemTime.milliseconds
+        val startPutTime = SystemTime.nanoseconds
         putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
-        waitPut.mark(SystemTime.milliseconds - startPutTime)
+        waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers)
       }
     }
 
     def take(): ProducerRecord = {
       var data: ProducerRecord = null
       while (data == null) {
-        val startTakeTime = SystemTime.milliseconds
+        val startTakeTime = SystemTime.nanoseconds
         data = queue.poll(500, TimeUnit.MILLISECONDS)
-        waitTake.mark(SystemTime.milliseconds - startTakeTime)
+        waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers)
       }
       data
     }