You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/18 06:54:13 UTC

[19/27] git commit: KAFKA-808 Migration tool internal queue between consumer and producer threads should be configurable; reviewed by Jun Rao

KAFKA-808 Migration tool internal queue between consumer and producer threads should be configurable; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7fd9268f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7fd9268f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7fd9268f

Branch: refs/heads/trunk
Commit: 7fd9268f7b89afa2e41b526f1f4d7bfc90a519cf
Parents: 3b3fb7f
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 15 08:31:13 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 15 08:31:13 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/KafkaMigrationTool.java |   10 +++++++++-
 1 files changed, 9 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7fd9268f/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 7f0d1ce..95fbe46 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -134,6 +134,13 @@ public class KafkaMigrationTool {
       .describedAs("Java regex (String)")
       .ofType(String.class);
 
+    ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
+      =  parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
+      .withRequiredArg()
+      .describedAs("Queue size in terms of number of messages")
+      .ofType(Integer.class)
+      .defaultsTo(10000);
+
     OptionSpecBuilder helpOpt
       = parser.accepts("help", "Print this message.");
 
@@ -212,7 +219,8 @@ public class KafkaMigrationTool {
       kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
       kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
       // create a producer channel instead
-      ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(numProducers);
+      int queueSize = options.valueOf(queueSizeOpt);
+      ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize);
       int threadId = 0;
 
       Runtime.getRuntime().addShutdownHook(new Thread() {