You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/11 06:40:52 UTC
git commit: KAFKA-1237 mirror maker using 08 consumer and 09 producer;
reviewed by Jay Kreps and Joel Koshy
Updated Branches:
refs/heads/trunk 6b80dbb97 -> 71e21c6ba
KAFKA-1237 mirror maker using 08 consumer and 09 producer; reviewed by Jay Kreps and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71e21c6b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71e21c6b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71e21c6b
Branch: refs/heads/trunk
Commit: 71e21c6bacb75bf7b5f65dc67c921d4c7583fa12
Parents: 6b80dbb
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Feb 10 21:40:34 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Feb 10 21:40:34 2014 -0800
----------------------------------------------------------------------
build.gradle | 1 +
.../kafka/tools/newproducer/MirrorMaker.scala | 184 +++++++++++++++++++
2 files changed, 185 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/71e21c6b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 858d297..d9d6e6a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -136,6 +136,7 @@ project(':core') {
}
dependencies {
+ compile project(':clients')
compile "org.scala-lang:scala-library:$scalaVersion"
compile 'org.apache.zookeeper:zookeeper:3.3.4'
compile 'com.101tec:zkclient:0.3'
http://git-wip-us.apache.org/repos/asf/kafka/blob/71e21c6b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
new file mode 100644
index 0000000..faa07e9
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools.newproducer
+
+import joptsimple.OptionParser
+import kafka.utils.{Utils, CommandLineUtils, Logging}
+import java.util.concurrent.CountDownLatch
+import kafka.consumer._
+import collection.mutable.ListBuffer
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
+import java.util.concurrent.atomic.AtomicInteger
+
+
+object MirrorMaker extends Logging {
+
+ private var connector: ZookeeperConsumerConnector = null
+ private var mirroringThreads: Seq[MirrorMakerThread] = null
+ private var producerChannel: ProducerDataChannel = null
+
+ def main(args: Array[String]) {
+ info ("Starting mirror maker")
+ val parser = new OptionParser
+
+ val consumerConfigOpt = parser.accepts("consumer.config",
+ "Consumer config to consume from a source cluster. " +
+ "You may specify multiple of these.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(classOf[String])
+
+ val producerConfigOpt = parser.accepts("producer.config",
+ "Embedded producer config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(classOf[String])
+
+ val numStreamsOpt = parser.accepts("num.streams",
+ "Number of mirroring streams.")
+ .withRequiredArg()
+ .describedAs("Number of threads")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+
+ val whitelistOpt = parser.accepts("whitelist",
+ "Whitelist of topics to mirror.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
+
+ val blacklistOpt = parser.accepts("blacklist",
+ "Blacklist of topics to mirror.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
+
+ val helpOpt = parser.accepts("help", "Print this message.")
+ val options = parser.parse(args : _*)
+ if (options.has(helpOpt)) {
+ parser.printHelpOn(System.out)
+ System.exit(0)
+ }
+ CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
+ if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
+ fatal("Exactly one of whitelist or blacklist is required.")
+ System.exit(1)
+ }
+ val filterSpec = if (options.has(whitelistOpt))
+ new Whitelist(options.valueOf(whitelistOpt))
+ else
+ new Blacklist(options.valueOf(blacklistOpt))
+ val producerConfig = options.valueOf(producerConfigOpt)
+ val producerProps = Utils.loadProps(producerConfig)
+ producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL, "true")
+ val consumerConfig = options.valueOf(consumerConfigOpt)
+ val numStreams = options.valueOf(numStreamsOpt)
+ producerChannel = new ProducerDataChannel()
+ connector = new ZookeeperConsumerConnector(new ConsumerConfig(Utils.loadProps(consumerConfig)))
+ var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null
+ try {
+ streams = connector.createMessageStreamsByFilter(filterSpec, numStreams.intValue())
+ debug("%d consumer streams created".format(streams.size))
+ } catch {
+ case t: Throwable =>
+ fatal("Unable to create stream - shutting down mirror maker.")
+ connector.shutdown()
+ System.exit(1)
+ }
+ val streamIndex = new AtomicInteger()
+ streams.foreach(stream => producerChannel.addProducer(new KafkaProducer(producerProps)))
+ mirroringThreads = streams.map(stream => new MirrorMakerThread(stream, streamIndex.getAndIncrement))
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ cleanShutdown()
+ }
+ })
+ // start the mirroring threads
+ mirroringThreads.foreach(_.start)
+ // in case the consumer threads hit a timeout/other exception
+ mirroringThreads.foreach(_.awaitShutdown)
+ cleanShutdown()
+ }
+
+ def cleanShutdown() {
+ if (connector != null) connector.shutdown()
+ if (mirroringThreads != null) mirroringThreads.foreach(_.awaitShutdown)
+ if (producerChannel != null) producerChannel.close()
+ info("Kafka mirror maker shutdown successfully")
+ }
+
+ class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
+ threadId: Int)
+ extends Thread with Logging {
+
+ private val shutdownLatch = new CountDownLatch(1)
+ private val threadName = "mirrormaker-" + threadId
+ this.logIdent = "[%s] ".format(threadName)
+
+ this.setName(threadName)
+
+ override def run() {
+ info("Starting mirror maker thread " + threadName)
+ try {
+ for (msgAndMetadata <- stream) {
+ producerChannel.send(new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key(), msgAndMetadata.message()))
+ }
+ } catch {
+ case e: Throwable =>
+ fatal("Stream unexpectedly exited.", e)
+ } finally {
+ shutdownLatch.countDown()
+ info("Stopped thread.")
+ }
+ }
+
+ def awaitShutdown() {
+ try {
+ shutdownLatch.await()
+ } catch {
+ case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName))
+ }
+ }
+ }
+
+ class ProducerDataChannel extends Logging {
+ val producers = new ListBuffer[KafkaProducer]
+ var producerIndex = 0
+
+ def addProducer(producer: KafkaProducer) {
+ producers += producer
+ }
+
+ def send(producerRecord: ProducerRecord) {
+ if(producerRecord.key() != null) {
+ val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size
+ trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId))
+ val producer = producers(producerId)
+ producer.send(producerRecord)
+ } else {
+ producers(producerIndex).send(producerRecord)
+ producerIndex = (producerIndex + 1) % producers.size
+ }
+ }
+
+ def close() {
+ producers.foreach(_.close())
+ }
+ }
+}
+