You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/08/18 01:34:08 UTC
svn commit: r1158969 - in /incubator/kafka/trunk:
core/src/main/scala/kafka/consumer/ConsumerConfig.scala
core/src/main/scala/kafka/server/KafkaServerStartable.scala
system_test/embedded_consumer/config/whitelisttest.consumer.properties
Author: junrao
Date: Wed Aug 17 23:34:08 2011
New Revision: 1158969
URL: http://svn.apache.org/viewvc?rev=1158969&view=rev
Log:
change whitelist config for mirroring; patched by Joel; KAFKA-103
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1158969&r1=1158968&r2=1158969&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Wed Aug 17 23:34:08 2011
@@ -34,9 +34,11 @@ object ConsumerConfig {
val ConsumerTimeoutMs = -1
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
+ val MirrorConsumerNumThreads = 1
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
+ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
}
class ConsumerConfig(props: Properties) extends ZKConfig(props) {
@@ -85,17 +87,19 @@ class ConsumerConfig(props: Properties)
val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
/** Whitelist of topics for this mirror's embedded consumer to consume. At
- * most one of whitelist/blacklist may be specified.
- * e.g., topic1:1,topic2:1 */
- val mirrorTopicsWhitelistMap = Utils.getConsumerTopicMap(Utils.getString(
- props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist))
+ * most one of whitelist/blacklist may be specified. */
+ val mirrorTopicsWhitelist = Utils.getString(
+ props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
/** Topics to skip mirroring. At most one of whitelist/blacklist may be
* specified */
val mirrorTopicsBlackList = Utils.getString(
props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
- if (mirrorTopicsWhitelistMap.nonEmpty && mirrorTopicsBlackList.nonEmpty)
+ if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
+
+ val mirrorConsumerNumThreads = Utils.getInt(
+ props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1158969&r1=1158968&r2=1158969&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Wed Aug 17 23:34:08 2011
@@ -65,6 +65,9 @@ class EmbeddedConsumer(private val consu
private val logger = Logger.getLogger(getClass)
+ private val whiteListTopics =
+ consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
+
private val blackListTopics =
consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
@@ -78,8 +81,8 @@ class EmbeddedConsumer(private val consu
private def isTopicAllowed(topic: String) = {
- if (consumerConfig.mirrorTopicsWhitelistMap.nonEmpty)
- consumerConfig.mirrorTopicsWhitelistMap.contains(topic)
+ if (consumerConfig.mirrorTopicsWhitelist.nonEmpty)
+ whiteListTopics.contains(topic)
else
!blackListTopics.contains(topic)
}
@@ -107,7 +110,9 @@ class EmbeddedConsumer(private val consu
private def makeTopicMap(mirrorTopics: Seq[String]) = {
if (mirrorTopics.nonEmpty)
- Utils.getConsumerTopicMap(mirrorTopics.mkString("", ":1,", ":1"))
+ Utils.getConsumerTopicMap(mirrorTopics.mkString(
+ "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads),
+ ":%d".format(consumerConfig.mirrorConsumerNumThreads)))
else
Utils.getConsumerTopicMap("")
}
Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties?rev=1158969&r1=1158968&r2=1158969&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties Wed Aug 17 23:34:08 2011
@@ -11,5 +11,5 @@ zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1
-mirror.topics.whitelist=test01:1
+mirror.topics.whitelist=test01