You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/30 17:37:39 UTC

git commit: KAFKA-1184 High-Level Consumer: expose fetcher threads number as a parameter; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/trunk 269d16d3c -> 9c1d8e35c


KAFKA-1184 High-Level Consumer: expose fetcher threads number as a parameter; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c1d8e35
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c1d8e35
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c1d8e35

Branch: refs/heads/trunk
Commit: 9c1d8e35c5913d22098cf80ae69035131cfee87f
Parents: 269d16d
Author: Evelina Stepanova <st...@gmail.com>
Authored: Thu Jan 30 08:36:53 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Jan 30 08:37:28 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerConfig.scala        | 4 ++++
 .../src/main/scala/kafka/consumer/ConsumerFetcherManager.scala | 2 +-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala           | 1 +
 perf/src/main/scala/kafka/perf/ConsumerPerformance.scala       | 6 ++++++
 4 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index c8c4212..e6875d6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -28,6 +28,7 @@ object ConsumerConfig extends Config {
   val SocketBufferSize = 64*1024
   val FetchSize = 1024 * 1024
   val MaxFetchSize = 10*FetchSize
+  val NumConsumerFetchers = 1
   val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
   val AutoCommitInterval = 60 * 1000
@@ -93,6 +94,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   
   /** the number of byes of messages to attempt to fetch */
   val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
+
+  /** the number threads used to fetch data */
+  val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
   val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index e4451bb..b9e2bea 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
-                                       config.clientId, 1) {
+                                       config.clientId, config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d88b6c3..426b1a7 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -150,6 +150,7 @@ object TestUtils extends Logging {
     props.put("auto.commit.interval.ms", "1000")
     props.put("rebalance.max.retries", "4")
     props.put("auto.offset.reset", "smallest")
+    props.put("num.consumer.fetchers", "2")
 
     props
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index ec3cd29..55ee01b 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -112,6 +112,11 @@ object ConsumerPerformance {
                            .describedAs("count")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(10)
+    val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
+                               .withRequiredArg
+                               .describedAs("count")
+                               .ofType(classOf[java.lang.Integer])
+                               .defaultsTo(1)
 
     val options = parser.parse(args : _*)
 
@@ -130,6 +135,7 @@ object ConsumerPerformance {
     props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
     props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", "5000")
+    props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
     val consumerConfig = new ConsumerConfig(props)
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val topic = options.valueOf(topicOpt)