You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/05/16 19:53:00 UTC
git commit: kafka-1453 (follow-up);
Add a channel queue jmx in Mirror Maker; patched by Guozhang Wang;
reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk b4c1089fa -> 345c52ee3
kafka-1453 (follow-up); Add a channel queue jmx in Mirror Maker; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/345c52ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/345c52ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/345c52ee
Branch: refs/heads/trunk
Commit: 345c52ee3581847ddfaa4e53173714d8d97896c5
Parents: b4c1089
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri May 16 10:52:58 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri May 16 10:52:58 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/tools/MirrorMaker.scala | 47 +++++++++++---------
1 file changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/345c52ee/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 19df3d5..52763df 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -116,16 +116,22 @@ object MirrorMaker extends Logging {
val useNewProducer = options.has(useNewProducerOpt)
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
- // create data channel
- val mirrorDataChannel = new DataChannel(bufferSize)
-
// create producer threads
val producers = (1 to numProducers).map(_ => {
- if (useNewProducer)
- new NewShinyProducer(producerProps)
- else
- new OldProducer(producerProps)
- })
+ if (useNewProducer)
+ new NewShinyProducer(producerProps)
+ else
+ new OldProducer(producerProps)
+ })
+
+ // create consumer streams
+ connectors = options.valuesOf(consumerConfigOpt).toList
+ .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
+ .map(new ZookeeperConsumerConnector(_))
+ val numConsumers = connectors.size * numStreams
+
+ // create a data channel btw the consumers and the producers
+ val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
producerThreads = new ListBuffer[ProducerThread]()
var producerIndex: Int = 1
@@ -135,11 +141,6 @@ object MirrorMaker extends Logging {
producerIndex += 1
}
- // create consumer streams
- connectors = options.valuesOf(consumerConfigOpt).toList
- .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
- .map(new ZookeeperConsumerConnector(_))
-
val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt))
else
@@ -154,6 +155,7 @@ object MirrorMaker extends Logging {
connectors.foreach(_.shutdown)
}
consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2))
+ assert(consumerThreads.size == numConsumers)
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
@@ -181,7 +183,7 @@ object MirrorMaker extends Logging {
info("Kafka mirror maker shutdown successfully")
}
- class DataChannel(capacity: Int) extends KafkaMetricsGroup {
+ class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup {
val queue = new ArrayBlockingQueue[ProducerRecord](capacity)
@@ -192,25 +194,28 @@ object MirrorMaker extends Logging {
}
)
- private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.MILLISECONDS)
- private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.MILLISECONDS)
-
+ // We use a single meter for aggregated wait percentage for the data channel.
+ // Since meter is calculated as total_recorded_value / time_window and
+ // time_window is independent of the number of threads, each recorded wait
+ // time should be discounted by # threads.
+ private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
+ private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
def put(record: ProducerRecord) {
var putSucceed = false
while (!putSucceed) {
- val startPutTime = SystemTime.milliseconds
+ val startPutTime = SystemTime.nanoseconds
putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
- waitPut.mark(SystemTime.milliseconds - startPutTime)
+ waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers)
}
}
def take(): ProducerRecord = {
var data: ProducerRecord = null
while (data == null) {
- val startTakeTime = SystemTime.milliseconds
+ val startTakeTime = SystemTime.nanoseconds
data = queue.poll(500, TimeUnit.MILLISECONDS)
- waitTake.mark(SystemTime.milliseconds - startTakeTime)
+ waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers)
}
data
}