You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/18 06:54:13 UTC
[19/27] git commit: KAFKA-808 Migration tool internal queue between
consumer and producer threads should be configurable; reviewed by Jun Rao
KAFKA-808 Migration tool internal queue between consumer and producer threads should be configurable; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7fd9268f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7fd9268f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7fd9268f
Branch: refs/heads/trunk
Commit: 7fd9268f7b89afa2e41b526f1f4d7bfc90a519cf
Parents: 3b3fb7f
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 15 08:31:13 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 15 08:31:13 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/tools/KafkaMigrationTool.java | 10 +++++++++-
1 files changed, 9 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7fd9268f/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 7f0d1ce..95fbe46 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -134,6 +134,13 @@ public class KafkaMigrationTool {
.describedAs("Java regex (String)")
.ofType(String.class);
+ ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
+ = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
+ .withRequiredArg()
+ .describedAs("Queue size in terms of number of messages")
+ .ofType(Integer.class)
+ .defaultsTo(10000);
+
OptionSpecBuilder helpOpt
= parser.accepts("help", "Print this message.");
@@ -212,7 +219,8 @@ public class KafkaMigrationTool {
kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
// create a producer channel instead
- ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(numProducers);
+ int queueSize = options.valueOf(queueSizeOpt);
+ ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize);
int threadId = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {