You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/30 17:37:39 UTC
git commit: KAFKA-1184 High-Level Consumer: expose fetcher threads
number as a parameter; reviewed by Neha Narkhede
Updated Branches:
refs/heads/trunk 269d16d3c -> 9c1d8e35c
KAFKA-1184 High-Level Consumer: expose fetcher threads number as a parameter; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c1d8e35
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c1d8e35
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c1d8e35
Branch: refs/heads/trunk
Commit: 9c1d8e35c5913d22098cf80ae69035131cfee87f
Parents: 269d16d
Author: Evelina Stepanova <st...@gmail.com>
Authored: Thu Jan 30 08:36:53 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Jan 30 08:37:28 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 4 ++++
.../src/main/scala/kafka/consumer/ConsumerFetcherManager.scala | 2 +-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 1 +
perf/src/main/scala/kafka/perf/ConsumerPerformance.scala | 6 ++++++
4 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index c8c4212..e6875d6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -28,6 +28,7 @@ object ConsumerConfig extends Config {
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
+ val NumConsumerFetchers = 1
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 60 * 1000
@@ -93,6 +94,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** the number of byes of messages to attempt to fetch */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
+
+ /** the number threads used to fetch data */
+ val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index e4451bb..b9e2bea 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
private val config: ConsumerConfig,
private val zkClient : ZkClient)
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
- config.clientId, 1) {
+ config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d88b6c3..426b1a7 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -150,6 +150,7 @@ object TestUtils extends Logging {
props.put("auto.commit.interval.ms", "1000")
props.put("rebalance.max.retries", "4")
props.put("auto.offset.reset", "smallest")
+ props.put("num.consumer.fetchers", "2")
props
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index ec3cd29..55ee01b 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -112,6 +112,11 @@ object ConsumerPerformance {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10)
+ val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
val options = parser.parse(args : _*)
@@ -130,6 +135,7 @@ object ConsumerPerformance {
props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", "5000")
+ props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
val consumerConfig = new ConsumerConfig(props)
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)