You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/28 17:54:11 UTC

git commit: KAFKA-829 Mirror maker needs to share the migration tool request channel; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 be3ce1472 -> 66b103895


KAFKA-829 Mirror maker needs to share the migration tool request channel; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/66b10389
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/66b10389
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/66b10389

Branch: refs/heads/0.8
Commit: 66b103895720700c5ea47e8f75a1105aca6e9342
Parents: be3ce14
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Mar 28 09:54:02 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Mar 28 09:54:02 2013 -0700

----------------------------------------------------------------------
 config/producer.properties                         |    2 +-
 .../scala/kafka/javaapi/producer/Producer.scala    |    4 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |   25 ++---
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |   79 +++++++++------
 4 files changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index a1c8cb2..cc8f5f6 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -31,7 +31,7 @@ producer.type=sync
 compression.codec=none
 
 # message encoder
-serializer.class=kafka.serializer.StringEncoder
+serializer.class=kafka.serializer.DefaultEncoder
 
 # allow topic level compression
 #compressed.topics=

http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 424ef39..7265328 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -26,7 +26,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
   /**
    * Sends the data to a single topic, partitioned by key, using either the
    * synchronous or the asynchronous producer
-   * @param producerData the producer data object that encapsulates the topic, key and message data
+   * @param message the producer data object that encapsulates the topic, key and message data
    */
   def send(message: KeyedMessage[K,V]) {
     underlying.send(message)
@@ -34,7 +34,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
 
   /**
    * Use this API to send data to multiple topics
-   * @param producerData list of producer data objects that encapsulate the topic, key and message data
+   * @param messages list of producer data objects that encapsulate the topic, key and message data
    */
   def send(messages: java.util.List[KeyedMessage[K,V]]) {
     import collection.JavaConversions._

http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index eb63d75..a15b350 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -220,7 +220,7 @@ public class KafkaMigrationTool {
       kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
       // create a producer channel instead
       int queueSize = options.valueOf(queueSizeOpt);
-      ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize);
+      ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
       int threadId = 0;
 
       Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -279,8 +279,7 @@ public class KafkaMigrationTool {
     }
   }
 
-
-  private static class ProducerDataChannel<T> {
+  static class ProducerDataChannel<T> {
     private final int producerQueueSize;
     private final BlockingQueue<T> producerRequestQueue;
 
@@ -300,14 +299,14 @@ public class KafkaMigrationTool {
 
   private static class MigrationThread extends Thread {
     private final Object stream;
-    private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
+    private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
     private final int threadId;
     private final String threadName;
     private final org.apache.log4j.Logger logger;
     private CountDownLatch shutdownComplete = new CountDownLatch(1);
     private final AtomicBoolean isRunning = new AtomicBoolean(true);
 
-    MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) {
+    MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
       stream = _stream;
       producerDataChannel = _producerDataChannel;
       threadId = _threadId;
@@ -336,7 +335,7 @@ public class KafkaMigrationTool {
           ((ByteBuffer)payload_07).get(bytes);
           if(logger.isDebugEnabled())
             logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
-          KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
+          KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
           producerDataChannel.sendRequest(producerData);
         }
         logger.info("Migration thread " + threadName + " finished running");
@@ -362,17 +361,17 @@ public class KafkaMigrationTool {
     }
   }
 
-  private static class ProducerThread extends Thread {
-    private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
-    private final Producer<String, byte[]> producer;
+  static class ProducerThread extends Thread {
+    private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
+    private final Producer<byte[], byte[]> producer;
     private final int threadId;
     private String threadName;
     private org.apache.log4j.Logger logger;
     private CountDownLatch shutdownComplete = new CountDownLatch(1);
-    private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
+    private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
 
-    public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel,
-                          Producer<String, byte[]> _producer,
+    public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel,
+                          Producer<byte[], byte[]> _producer,
                           int _threadId) {
       producerDataChannel = _producerDataChannel;
       producer = _producer;
@@ -385,7 +384,7 @@ public class KafkaMigrationTool {
     public void run() {
       try{
         while(true) {
-          KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest();
+          KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
           if(!data.equals(shutdownMessage))
             producer.send(data);
           else

http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 5c4b3d2..3d22dc7 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -24,6 +24,9 @@ import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
 import kafka.consumer._
 import kafka.serializer._
+import collection.mutable.ListBuffer
+import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
+import kafka.javaapi
 
 
 object MirrorMaker extends Logging {
@@ -59,7 +62,13 @@ object MirrorMaker extends Logging {
       .describedAs("Number of threads")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
-    
+
+    val bufferSizeOpt =  parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer")
+      .withRequiredArg()
+      .describedAs("Queue size in terms of number of messages")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(10000);
+
     val whitelistOpt = parser.accepts("whitelist",
       "Whitelist of topics to mirror.")
       .withRequiredArg()
@@ -88,6 +97,7 @@ object MirrorMaker extends Logging {
     }
 
     val numStreams = options.valueOf(numStreamsOpt)
+    val bufferSize = options.valueOf(bufferSizeOpt).intValue()
 
     val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
       val config = new ProducerConfig(
@@ -95,52 +105,63 @@ object MirrorMaker extends Logging {
       new Producer[Array[Byte], Array[Byte]](config)
     })
 
-    val threads = {
-      val connectors = options.valuesOf(consumerConfigOpt).toList
-              .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
-              .map(new ZookeeperConsumerConnector(_))
+    val connectors = options.valuesOf(consumerConfigOpt).toList
+            .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
+            .map(new ZookeeperConsumerConnector(_))
 
-      Runtime.getRuntime.addShutdownHook(new Thread() {
-        override def run() {
-          connectors.foreach(_.shutdown())
-          producers.foreach(_.close())
-        }
-      })
+    val filterSpec = if (options.has(whitelistOpt))
+      new Whitelist(options.valueOf(whitelistOpt))
+    else
+      new Blacklist(options.valueOf(blacklistOpt))
 
-      val filterSpec = if (options.has(whitelistOpt))
-        new Whitelist(options.valueOf(whitelistOpt))
-      else
-        new Blacklist(options.valueOf(blacklistOpt))
+    val streams =
+      connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
 
-      val streams =
-        connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
+    val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
-      streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2))
-    }
+    val consumerThreads =
+      streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+
+    val producerThreads = new ListBuffer[ProducerThread]()
+
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() {
+        connectors.foreach(_.shutdown)
+        consumerThreads.foreach(_.awaitShutdown)
+        producerThreads.foreach(_.shutdown)
+        producerThreads.foreach(_.awaitShutdown)
+        logger.info("Kafka migration tool shutdown successfully");
+      }
+    })
 
-    threads.foreach(_.start())
+    // create producer threads
+    var i: Int = 1
+    for(producer <- producers) {
+      val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel,
+        new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i)
+      producerThreads += producerThread
+      i += 1
+    }
 
-    threads.foreach(_.awaitShutdown())
+    consumerThreads.foreach(_.start)
+    producerThreads.foreach(_.start)
   }
 
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
-                          producers: Seq[Producer[Array[Byte], Array[Byte]]],
+                          producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]],
                           threadId: Int)
           extends Thread with Logging {
 
     private val shutdownLatch = new CountDownLatch(1)
     private val threadName = "mirrormaker-" + threadId
-    private val producerSelector = Utils.circularIterator(producers)
 
     this.setName(threadName)
 
     override def run() {
       try {
         for (msgAndMetadata <- stream) {
-          val producer = producerSelector.next()
-          val pd = new KeyedMessage[Array[Byte], Array[Byte]](
-            msgAndMetadata.topic, msgAndMetadata.message)
-          producer.send(pd)
+          val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
+          producerDataChannel.sendRequest(pd)
         }
       } catch {
         case e =>
@@ -155,9 +176,7 @@ object MirrorMaker extends Logging {
       try {
         shutdownLatch.await()
       } catch {
-        case e: InterruptedException => fatal(
-          "Shutdown of thread %s interrupted. This might leak data!"
-                  .format(threadName))
+        case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName))
       }
     }
   }