You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/08/18 01:34:08 UTC

svn commit: r1158969 - in /incubator/kafka/trunk: core/src/main/scala/kafka/consumer/ConsumerConfig.scala core/src/main/scala/kafka/server/KafkaServerStartable.scala system_test/embedded_consumer/config/whitelisttest.consumer.properties

Author: junrao
Date: Wed Aug 17 23:34:08 2011
New Revision: 1158969

URL: http://svn.apache.org/viewvc?rev=1158969&view=rev
Log:
change whitelist config for mirroring; patched by Joel; KAFKA-103

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1158969&r1=1158968&r2=1158969&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Wed Aug 17 23:34:08 2011
@@ -34,9 +34,11 @@ object ConsumerConfig {
   val ConsumerTimeoutMs = -1
   val MirrorTopicsWhitelist = ""
   val MirrorTopicsBlacklist = ""
+  val MirrorConsumerNumThreads = 1
 
   val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
+  val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
 }
 
 class ConsumerConfig(props: Properties) extends ZKConfig(props) {
@@ -85,17 +87,19 @@ class ConsumerConfig(props: Properties) 
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
 
   /** Whitelist of topics for this mirror's embedded consumer to consume. At
-   *  most one of whitelist/blacklist may be specified.
-   *  e.g., topic1:1,topic2:1 */
-  val mirrorTopicsWhitelistMap = Utils.getConsumerTopicMap(Utils.getString(
-    props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist))
+   *  most one of whitelist/blacklist may be specified. */
+  val mirrorTopicsWhitelist = Utils.getString(
+    props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
  
   /** Topics to skip mirroring. At most one of whitelist/blacklist may be
    *  specified */
   val mirrorTopicsBlackList = Utils.getString(
     props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
 
-  if (mirrorTopicsWhitelistMap.nonEmpty && mirrorTopicsBlackList.nonEmpty)
+  if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
       throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
+
+  val mirrorConsumerNumThreads = Utils.getInt(
+    props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
 }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1158969&r1=1158968&r2=1158969&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Wed Aug 17 23:34:08 2011
@@ -65,6 +65,9 @@ class EmbeddedConsumer(private val consu
 
   private val logger = Logger.getLogger(getClass)
 
+  private val whiteListTopics =
+    consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
+
   private val blackListTopics =
     consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
 
@@ -78,8 +81,8 @@ class EmbeddedConsumer(private val consu
 
 
   private def isTopicAllowed(topic: String) = {
-    if (consumerConfig.mirrorTopicsWhitelistMap.nonEmpty)
-      consumerConfig.mirrorTopicsWhitelistMap.contains(topic)
+    if (consumerConfig.mirrorTopicsWhitelist.nonEmpty)
+      whiteListTopics.contains(topic)
     else
       !blackListTopics.contains(topic)
   }
@@ -107,7 +110,9 @@ class EmbeddedConsumer(private val consu
 
   private def makeTopicMap(mirrorTopics: Seq[String]) = {
     if (mirrorTopics.nonEmpty)
-      Utils.getConsumerTopicMap(mirrorTopics.mkString("", ":1,", ":1"))
+      Utils.getConsumerTopicMap(mirrorTopics.mkString(
+        "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads),
+        ":%d".format(consumerConfig.mirrorConsumerNumThreads)))
     else
       Utils.getConsumerTopicMap("")
   }

Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties?rev=1158969&r1=1158968&r2=1158969&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties Wed Aug 17 23:34:08 2011
@@ -11,5 +11,5 @@ zk.connectiontimeout.ms=1000000
 #consumer group id
 groupid=group1
 
-mirror.topics.whitelist=test01:1
+mirror.topics.whitelist=test01