You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 20:30:22 UTC

kafka git commit: KAFKA-2761: enable whitelist regex subscription for new consumer in ConsoleConsumer

Repository: kafka
Updated Branches:
  refs/heads/trunk 23cc9c77b -> 34d997665


KAFKA-2761: enable whitelist regex subscription for new consumer in ConsoleConsumer

#412 is a pre-req.

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Guozhang Wang

Closes #445 from SinghAsDev/KAFKA-2761


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34d99766
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34d99766
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34d99766

Branch: refs/heads/trunk
Commit: 34d9976656893abcec3d24c54b367eeb091c101d
Parents: 23cc9c7
Author: Ashish Singh <as...@cloudera.com>
Authored: Sun Nov 8 11:36:14 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 11:36:14 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/consumer/BaseConsumer.scala | 15 ++++++++++++---
 .../src/main/scala/kafka/tools/ConsoleConsumer.scala |  9 ++++++---
 2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/34d99766/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 52cd5fa..ced4391 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -18,6 +18,9 @@
 package kafka.consumer
 
 import java.util.Properties
+import java.util.regex.Pattern
+
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 
 /**
  * A base consumer used to abstract both old and new consumer
@@ -33,13 +36,19 @@ trait BaseConsumer {
 
 case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
 
-class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
+class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
 
-import scala.collection.JavaConversions._
+  import scala.collection.JavaConversions._
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-  consumer.subscribe(List(topic))
+  if (topic.isDefined)
+    consumer.subscribe(List(topic.get))
+  else if (whitelist.isDefined)
+    consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
+  else
+    throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
+
   var recordIter = consumer.poll(0).iterator
 
   override def receive(): BaseConsumerRecord = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/34d99766/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9f296bd..2b1a69a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -52,7 +52,7 @@ object ConsoleConsumer extends Logging {
     val consumer =
       if (conf.useNewConsumer) {
         val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
-        new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs)
+        new NewShinyConsumer(Option(conf.topicArg), Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs)
       } else {
         checkZk(conf)
         new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
@@ -243,11 +243,14 @@ object ConsoleConsumer extends Logging {
     // If using old consumer, exactly one of whitelist/blacklist/topic is required.
     // If using new consumer, topic must be specified.
     var topicArg: String = null
+    var whitelistArg: String = null
     var filterSpec: TopicFilter = null
     if (useNewConsumer) {
-      if (!options.has(topicIdOpt))
-        CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.")
+      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
+      if (topicOrFilterOpt.size != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
       topicArg = options.valueOf(topicIdOpt)
+      whitelistArg = options.valueOf(whitelistOpt)
     } else {
       val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
       if (topicOrFilterOpt.size != 1)