You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/28 17:54:11 UTC
git commit: KAFKA-829 Mirror maker needs to share the migration tool
request channel; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 be3ce1472 -> 66b103895
KAFKA-829 Mirror maker needs to share the migration tool request channel; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/66b10389
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/66b10389
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/66b10389
Branch: refs/heads/0.8
Commit: 66b103895720700c5ea47e8f75a1105aca6e9342
Parents: be3ce14
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Mar 28 09:54:02 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Mar 28 09:54:02 2013 -0700
----------------------------------------------------------------------
config/producer.properties | 2 +-
.../scala/kafka/javaapi/producer/Producer.scala | 4 +-
.../main/scala/kafka/tools/KafkaMigrationTool.java | 25 ++---
core/src/main/scala/kafka/tools/MirrorMaker.scala | 79 +++++++++------
4 files changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index a1c8cb2..cc8f5f6 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -31,7 +31,7 @@ producer.type=sync
compression.codec=none
# message encoder
-serializer.class=kafka.serializer.StringEncoder
+serializer.class=kafka.serializer.DefaultEncoder
# allow topic level compression
#compressed.topics=
http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 424ef39..7265328 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -26,7 +26,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
/**
* Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer
- * @param producerData the producer data object that encapsulates the topic, key and message data
+ * @param message the producer data object that encapsulates the topic, key and message data
*/
def send(message: KeyedMessage[K,V]) {
underlying.send(message)
@@ -34,7 +34,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
/**
* Use this API to send data to multiple topics
- * @param producerData list of producer data objects that encapsulate the topic, key and message data
+ * @param messages list of producer data objects that encapsulate the topic, key and message data
*/
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index eb63d75..a15b350 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -220,7 +220,7 @@ public class KafkaMigrationTool {
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
// create a producer channel instead
int queueSize = options.valueOf(queueSizeOpt);
- ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize);
+ ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
int threadId = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -279,8 +279,7 @@ public class KafkaMigrationTool {
}
}
-
- private static class ProducerDataChannel<T> {
+ static class ProducerDataChannel<T> {
private final int producerQueueSize;
private final BlockingQueue<T> producerRequestQueue;
@@ -300,14 +299,14 @@ public class KafkaMigrationTool {
private static class MigrationThread extends Thread {
private final Object stream;
- private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
+ private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
private final int threadId;
private final String threadName;
private final org.apache.log4j.Logger logger;
private CountDownLatch shutdownComplete = new CountDownLatch(1);
private final AtomicBoolean isRunning = new AtomicBoolean(true);
- MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) {
+ MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
stream = _stream;
producerDataChannel = _producerDataChannel;
threadId = _threadId;
@@ -336,7 +335,7 @@ public class KafkaMigrationTool {
((ByteBuffer)payload_07).get(bytes);
if(logger.isDebugEnabled())
logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
- KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
+ KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
producerDataChannel.sendRequest(producerData);
}
logger.info("Migration thread " + threadName + " finished running");
@@ -362,17 +361,17 @@ public class KafkaMigrationTool {
}
}
- private static class ProducerThread extends Thread {
- private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
- private final Producer<String, byte[]> producer;
+ static class ProducerThread extends Thread {
+ private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
+ private final Producer<byte[], byte[]> producer;
private final int threadId;
private String threadName;
private org.apache.log4j.Logger logger;
private CountDownLatch shutdownComplete = new CountDownLatch(1);
- private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
+ private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
- public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel,
- Producer<String, byte[]> _producer,
+ public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel,
+ Producer<byte[], byte[]> _producer,
int _threadId) {
producerDataChannel = _producerDataChannel;
producer = _producer;
@@ -385,7 +384,7 @@ public class KafkaMigrationTool {
public void run() {
try{
while(true) {
- KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest();
+ KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
if(!data.equals(shutdownMessage))
producer.send(data);
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 5c4b3d2..3d22dc7 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -24,6 +24,9 @@ import scala.collection.JavaConversions._
import java.util.concurrent.CountDownLatch
import kafka.consumer._
import kafka.serializer._
+import collection.mutable.ListBuffer
+import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
+import kafka.javaapi
object MirrorMaker extends Logging {
@@ -59,7 +62,13 @@ object MirrorMaker extends Logging {
.describedAs("Number of threads")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
-
+
+ val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer")
+ .withRequiredArg()
+ .describedAs("Queue size in terms of number of messages")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(10000);
+
val whitelistOpt = parser.accepts("whitelist",
"Whitelist of topics to mirror.")
.withRequiredArg()
@@ -88,6 +97,7 @@ object MirrorMaker extends Logging {
}
val numStreams = options.valueOf(numStreamsOpt)
+ val bufferSize = options.valueOf(bufferSizeOpt).intValue()
val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
val config = new ProducerConfig(
@@ -95,52 +105,63 @@ object MirrorMaker extends Logging {
new Producer[Array[Byte], Array[Byte]](config)
})
- val threads = {
- val connectors = options.valuesOf(consumerConfigOpt).toList
- .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
- .map(new ZookeeperConsumerConnector(_))
+ val connectors = options.valuesOf(consumerConfigOpt).toList
+ .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
+ .map(new ZookeeperConsumerConnector(_))
- Runtime.getRuntime.addShutdownHook(new Thread() {
- override def run() {
- connectors.foreach(_.shutdown())
- producers.foreach(_.close())
- }
- })
+ val filterSpec = if (options.has(whitelistOpt))
+ new Whitelist(options.valueOf(whitelistOpt))
+ else
+ new Blacklist(options.valueOf(blacklistOpt))
- val filterSpec = if (options.has(whitelistOpt))
- new Whitelist(options.valueOf(whitelistOpt))
- else
- new Blacklist(options.valueOf(blacklistOpt))
+ val streams =
+ connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
- val streams =
- connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
+ val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
- streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2))
- }
+ val consumerThreads =
+ streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+
+ val producerThreads = new ListBuffer[ProducerThread]()
+
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ connectors.foreach(_.shutdown)
+ consumerThreads.foreach(_.awaitShutdown)
+ producerThreads.foreach(_.shutdown)
+ producerThreads.foreach(_.awaitShutdown)
+ logger.info("Kafka migration tool shutdown successfully");
+ }
+ })
- threads.foreach(_.start())
+ // create producer threads
+ var i: Int = 1
+ for(producer <- producers) {
+ val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel,
+ new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i)
+ producerThreads += producerThread
+ i += 1
+ }
- threads.foreach(_.awaitShutdown())
+ consumerThreads.foreach(_.start)
+ producerThreads.foreach(_.start)
}
class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
- producers: Seq[Producer[Array[Byte], Array[Byte]]],
+ producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]],
threadId: Int)
extends Thread with Logging {
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-" + threadId
- private val producerSelector = Utils.circularIterator(producers)
this.setName(threadName)
override def run() {
try {
for (msgAndMetadata <- stream) {
- val producer = producerSelector.next()
- val pd = new KeyedMessage[Array[Byte], Array[Byte]](
- msgAndMetadata.topic, msgAndMetadata.message)
- producer.send(pd)
+ val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
+ producerDataChannel.sendRequest(pd)
}
} catch {
case e =>
@@ -155,9 +176,7 @@ object MirrorMaker extends Logging {
try {
shutdownLatch.await()
} catch {
- case e: InterruptedException => fatal(
- "Shutdown of thread %s interrupted. This might leak data!"
- .format(threadName))
+ case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName))
}
}
}