You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 20:30:22 UTC
kafka git commit: KAFKA-2761: enable whitelist regex subscription for
new consumer in ConsoleConsumer
Repository: kafka
Updated Branches:
refs/heads/trunk 23cc9c77b -> 34d997665
KAFKA-2761: enable whitelist regex subscription for new consumer in ConsoleConsumer
#412 is a pre-req.
Author: Ashish Singh <as...@cloudera.com>
Reviewers: Guozhang Wang
Closes #445 from SinghAsDev/KAFKA-2761
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34d99766
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34d99766
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34d99766
Branch: refs/heads/trunk
Commit: 34d9976656893abcec3d24c54b367eeb091c101d
Parents: 23cc9c7
Author: Ashish Singh <as...@cloudera.com>
Authored: Sun Nov 8 11:36:14 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 11:36:14 2015 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/consumer/BaseConsumer.scala | 15 ++++++++++++---
.../src/main/scala/kafka/tools/ConsoleConsumer.scala | 9 ++++++---
2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/34d99766/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 52cd5fa..ced4391 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -18,6 +18,9 @@
package kafka.consumer
import java.util.Properties
+import java.util.regex.Pattern
+
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
/**
* A base consumer used to abstract both old and new consumer
@@ -33,13 +36,19 @@ trait BaseConsumer {
case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
-class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
+class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer
-import scala.collection.JavaConversions._
+ import scala.collection.JavaConversions._
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
- consumer.subscribe(List(topic))
+ if (topic.isDefined)
+ consumer.subscribe(List(topic.get))
+ else if (whitelist.isDefined)
+ consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
+ else
+ throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
+
var recordIter = consumer.poll(0).iterator
override def receive(): BaseConsumerRecord = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/34d99766/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9f296bd..2b1a69a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -52,7 +52,7 @@ object ConsoleConsumer extends Logging {
val consumer =
if (conf.useNewConsumer) {
val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
- new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs)
+ new NewShinyConsumer(Option(conf.topicArg), Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs)
} else {
checkZk(conf)
new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
@@ -243,11 +243,14 @@ object ConsoleConsumer extends Logging {
// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.
var topicArg: String = null
+ var whitelistArg: String = null
var filterSpec: TopicFilter = null
if (useNewConsumer) {
- if (!options.has(topicIdOpt))
- CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.")
+ val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
+ if (topicOrFilterOpt.size != 1)
+ CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
topicArg = options.valueOf(topicIdOpt)
+ whitelistArg = options.valueOf(whitelistOpt)
} else {
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)