You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/08/11 00:32:23 UTC
svn commit: r1156393 - in /incubator/kafka/trunk: core/src/main/scala/kafka/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/server/
project/build/ system_test/...
Author: junrao
Date: Wed Aug 10 22:32:23 2011
New Revision: 1156393
URL: http://svn.apache.org/viewvc?rev=1156393&view=rev
Log:
auto-discovery of topics for mirroring; patched by Joel; reviewed by Jun; KAFKA-74
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala
incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties
incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties
incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
incubator/kafka/trunk/project/build/KafkaProject.scala
incubator/kafka/trunk/system_test/embedded_consumer/README
incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties
incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties
incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties
incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Wed Aug 10 22:32:23 2011
@@ -19,6 +19,7 @@ package kafka
import consumer.ConsumerConfig
import org.apache.log4j.Logger
+import producer.ProducerConfig
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import utils.Utils
import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -30,21 +31,23 @@ object Kafka {
val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName))
- if(args.length != 1 && args.length != 2) {
- println("USAGE: java [options] " + classOf[KafkaServer].getSimpleName() + " server.properties [consumer.properties")
+ if (!List(1, 3).contains(args.length)) {
+ println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
System.exit(1)
}
try {
- var kafkaServerStartble: KafkaServerStartable = null
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)
- if (args.length == 2) {
- val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
- kafkaServerStartble = new KafkaServerStartable(serverConfig, consumerConfig)
+
+ val kafkaServerStartble = args.length match {
+ case 3 =>
+ val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
+ val producerConfig = new ProducerConfig(Utils.loadProps(args(2)))
+ new KafkaServerStartable(serverConfig, consumerConfig, producerConfig)
+ case 1 =>
+ new KafkaServerStartable(serverConfig)
}
- else
- kafkaServerStartble = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Wed Aug 10 22:32:23 2011
@@ -20,7 +20,7 @@ package kafka.consumer
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest
-
+import kafka.common.InvalidConfigException
object ConsumerConfig {
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
@@ -32,7 +32,11 @@ object ConsumerConfig {
val MaxQueuedChunks = 100
val AutoOffsetReset = OffsetRequest.SmallestTimeString
val ConsumerTimeoutMs = -1
- val EmbeddedConsumerTopics = ""
+ val MirrorTopicsWhitelist = ""
+ val MirrorTopicsBlacklist = ""
+
+ val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
+ val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
}
class ConsumerConfig(props: Properties) extends ZKConfig(props) {
@@ -80,7 +84,18 @@ class ConsumerConfig(props: Properties)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
- /* embed a consumer in the broker. e.g., topic1:1,topic2:1 */
- val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics",
- EmbeddedConsumerTopics))
+ /** Whitelist of topics for this mirror's embedded consumer to consume. At
+ * most one of whitelist/blacklist may be specified.
+ * e.g., topic1:1,topic2:1 */
+ val mirrorTopicsWhitelistMap = Utils.getConsumerTopicMap(Utils.getString(
+ props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist))
+
+ /** Topics to skip mirroring. At most one of whitelist/blacklist may be
+ * specified */
+ val mirrorTopicsBlackList = Utils.getString(
+ props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
+
+ if (mirrorTopicsWhitelistMap.nonEmpty && mirrorTopicsBlackList.nonEmpty)
+ throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala Wed Aug 10 22:32:23 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+trait TopicEventHandler[T] {
+
+ def handleTopicEvent(allTopics: Seq[T])
+
+}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Aug 10 22:32:23 2011
@@ -28,6 +28,7 @@ import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.api.OffsetRequest
+import java.util.UUID
/**
* This class handles the consumers interaction with zookeeper
@@ -156,7 +157,10 @@ private[kafka] class ZookeeperConsumerCo
case Some(consumerId) // for testing only
=> consumerUuid = consumerId
case None // generate unique consumerId automatically
- => consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis
+ => val uuid = UUID.randomUUID()
+ consumerUuid = "%s-%d-%s".format(
+ InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+ uuid.getMostSignificantBits().toHexString.substring(0,8))
}
val consumerIdString = config.groupId + "_" + consumerUuid
val topicCount = new TopicCount(consumerIdString, topicCountMap)
@@ -164,6 +168,11 @@ private[kafka] class ZookeeperConsumerCo
// listener to consumer and partition changes
val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString)
registerConsumerInZK(dirs, consumerIdString, topicCount)
+
+ // register listener for session expired event
+ zkClient.subscribeStateChanges(
+ new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
+
zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
// create a queue per topic per consumer thread
@@ -184,10 +193,6 @@ private[kafka] class ZookeeperConsumerCo
zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
}
- // register listener for session expired event
- zkClient.subscribeStateChanges(
- new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
-
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
ret
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Wed Aug 10 22:32:23 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import org.apache.log4j.Logger
+import scala.collection.JavaConversions._
+import kafka.utils.{Utils, ZkUtils, ZKStringSerializer}
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
+ val eventHandler: TopicEventHandler[String]) {
+
+ private val logger = Logger.getLogger(getClass)
+
+ val lock = new Object()
+
+ private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, ZKStringSerializer)
+
+ startWatchingTopicEvents()
+
+ private def startWatchingTopicEvents() {
+ val topicEventListener = new ZkTopicEventListener
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+
+ zkClient.subscribeStateChanges(
+ new ZkSessionExpireListener(topicEventListener))
+
+ val topics = zkClient.subscribeChildChanges(
+ ZkUtils.BrokerTopicsPath, topicEventListener).toList
+
+ // call to bootstrap topic list
+ topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
+ }
+
+ private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() }
+
+ def shutdown() {
+ lock.synchronized {
+ try {
+ if (zkClient != null) {
+ stopWatchingTopicEvents()
+ zkClient.close()
+ zkClient = null
+ }
+ else
+ logger.warn("Cannot shutdown already shutdown topic event watcher.")
+ }
+ catch {
+ case e =>
+ logger.fatal(e)
+ logger.fatal(Utils.stackTrace(e))
+ }
+ }
+ }
+
+ class ZkTopicEventListener() extends IZkChildListener {
+
+ @throws(classOf[Exception])
+ def handleChildChange(parent: String, children: java.util.List[String]) {
+ lock.synchronized {
+ try {
+ if (zkClient != null) {
+ val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
+ logger.debug("all topics: %s".format(latestTopics))
+
+ eventHandler.handleTopicEvent(latestTopics)
+ }
+ }
+ catch {
+ case e =>
+ logger.fatal(e)
+ logger.fatal(Utils.stackTrace(e))
+ }
+ }
+ }
+
+ }
+
+ class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener)
+ extends IZkStateListener {
+
+ @throws(classOf[Exception])
+ def handleStateChanged(state: KeeperState) { }
+
+ @throws(classOf[Exception])
+ def handleNewSession() {
+ lock.synchronized {
+ if (zkClient != null) {
+ logger.info(
+ "ZK expired: resubscribing topic event listener to topic registry")
+ zkClient.subscribeChildChanges(
+ ZkUtils.BrokerTopicsPath, topicEventListener)
+ }
+ }
+ }
+ }
+}
+
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Aug 10 22:32:23 2011
@@ -22,7 +22,7 @@ import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.common.InvalidConfigException
-class ProducerConfig(val props: Properties) extends ZKConfig(props)
+class ProducerConfig(val props: Properties) extends ZKConfig(props)
with AsyncProducerConfigShared with SyncProducerConfigShared{
/** For bypassing zookeeper based auto partition discovery, use this config *
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Wed Aug 10 22:32:23 2011
@@ -17,7 +17,7 @@
package kafka.producer.async
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.utils.Utils
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.log4j.{Level, Logger}
@@ -90,7 +90,28 @@ private[kafka] class AsyncProducer[T](co
if(cbkHandler != null)
data = cbkHandler.beforeEnqueue(data)
- val added = queue.offer(data)
+ val added = if (config.enqueueTimeoutMs != 0) {
+ try {
+ if (config.enqueueTimeoutMs < 0) {
+ queue.put(data)
+ true
+ }
+ else {
+ queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+ }
+ }
+ catch {
+ case e: InterruptedException =>
+ val msg = "%s interrupted during enqueue of event %s.".format(
+ getClass.getSimpleName, event.toString)
+ logger.error(msg)
+ throw new AsyncProducerInterruptedException(msg)
+ }
+ }
+ else {
+ queue.offer(data)
+ }
+
if(cbkHandler != null)
cbkHandler.afterEnqueue(data, added)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Wed Aug 10 22:32:23 2011
@@ -33,6 +33,14 @@ trait AsyncProducerConfigShared {
/** the maximum size of the blocking queue for buffering on the producer */
val queueSize = Utils.getInt(props, "queue.size", 10000)
+ /**
+ * Timeout for event enqueue:
+ * 0: events will be enqueued immediately or dropped if the queue is full
+ * -ve: enqueue will block indefinitely if the queue is full
+ * +ve: enqueue will block up to this many milliseconds if the queue is full
+ */
+ val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0)
+
/** the number of messages batched at the producer */
val batchSize = Utils.getInt(props, "batch.size", 200)
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala Wed Aug 10 22:32:23 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+class AsyncProducerInterruptedException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Wed Aug 10 22:32:23 2011
@@ -17,24 +17,29 @@
package kafka.server
+import kafka.utils.Utils
+import kafka.consumer._
+import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import kafka.message.Message
import org.apache.log4j.Logger
-import kafka.consumer.{Consumer, ConsumerConnector, ConsumerConfig}
-import kafka.utils.{SystemTime, Utils}
-import kafka.api.RequestKeys
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
-class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: ConsumerConfig) {
+import scala.collection.Map
+
+class KafkaServerStartable(val serverConfig: KafkaConfig,
+ val consumerConfig: ConsumerConfig,
+ val producerConfig: ProducerConfig) {
private var server : KafkaServer = null
private var embeddedConsumer : EmbeddedConsumer = null
init
- def this(serverConfig: KafkaConfig) = this(serverConfig, null)
+ def this(serverConfig: KafkaConfig) = this(serverConfig, null, null)
private def init() {
server = new KafkaServer(serverConfig)
if (consumerConfig != null)
- embeddedConsumer = new EmbeddedConsumer(consumerConfig, server)
+ embeddedConsumer =
+ new EmbeddedConsumer(consumerConfig, producerConfig, server)
}
def startup() {
@@ -55,43 +60,106 @@ class KafkaServerStartable(val serverCon
}
class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
- private val kafkaServer: KafkaServer) {
- private val logger = Logger.getLogger(getClass())
- private val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
- private val topicMessageStreams = consumerConnector.createMessageStreams(consumerConfig.embeddedConsumerTopicMap)
-
- def startup() = {
- var threadList = List[Thread]()
- for ((topic, streamList) <- topicMessageStreams)
- for (i <- 0 until streamList.length)
- threadList ::= Utils.newThread("kafka-embedded-consumer-" + topic + "-" + i, new Runnable() {
- def run() {
- logger.info("starting consumer thread " + i + " for topic " + topic)
- val logManager = kafkaServer.getLogManager
- val stats = kafkaServer.getStats
- try {
- for (message <- streamList(i)) {
- val partition = logManager.chooseRandomPartition(topic)
- val start = SystemTime.nanoseconds
- logManager.getOrCreateLog(topic, partition).append(
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = message))
- stats.recordRequest(RequestKeys.Produce, SystemTime.nanoseconds - start)
+ private val producerConfig: ProducerConfig,
+ private val kafkaServer: KafkaServer) extends TopicEventHandler[String] {
+
+ private val logger = Logger.getLogger(getClass)
+
+ private val blackListTopics =
+ consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
+
+ // mirrorTopics should be accessed by handleTopicEvent only
+ private var mirrorTopics:Seq[String] = List()
+
+ private var consumerConnector: ConsumerConnector = null
+ private var topicEventWatcher:ZookeeperTopicEventWatcher = null
+
+ private val producer = new Producer[Null, Message](producerConfig)
+
+
+ private def isTopicAllowed(topic: String) = {
+ if (consumerConfig.mirrorTopicsWhitelistMap.nonEmpty)
+ consumerConfig.mirrorTopicsWhitelistMap.contains(topic)
+ else
+ !blackListTopics.contains(topic)
+ }
+
+ // TopicEventHandler call-back only
+ @Override
+ def handleTopicEvent(allTopics: Seq[String]) {
+ val newMirrorTopics = allTopics.filter(isTopicAllowed)
+
+ val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
+ if (addedTopics.nonEmpty)
+ logger.info("topic event: added topics = %s".format(addedTopics))
+
+ val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
+ if (deletedTopics.nonEmpty)
+ logger.info("topic event: deleted topics = %s".format(deletedTopics))
+
+ mirrorTopics = newMirrorTopics
+
+ if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
+ logger.info("mirror topics = %s".format(mirrorTopics))
+ startNewConsumerThreads(makeTopicMap(mirrorTopics))
+ }
+ }
+
+ private def makeTopicMap(mirrorTopics: Seq[String]) = {
+ if (mirrorTopics.nonEmpty)
+ Utils.getConsumerTopicMap(mirrorTopics.mkString("", ":1,", ":1"))
+ else
+ Utils.getConsumerTopicMap("")
+ }
+
+ private def startNewConsumerThreads(topicMap: Map[String, Int]) {
+ if (topicMap.nonEmpty) {
+ if (consumerConnector != null)
+ consumerConnector.shutdown()
+ consumerConnector = Consumer.create(consumerConfig)
+ val topicMessageStreams = consumerConnector.createMessageStreams(topicMap)
+ var threadList = List[Thread]()
+ for ((topic, streamList) <- topicMessageStreams)
+ for (i <- 0 until streamList.length)
+ threadList ::= Utils.newThread("kafka-embedded-consumer-%s-%d".format(topic, i), new Runnable() {
+ def run() {
+ logger.info("Starting consumer thread %d for topic %s".format(i, topic))
+
+ try {
+ for (message <- streamList(i)) {
+ val pd = new ProducerData[Null, Message](topic, message)
+ producer.send(pd)
+ }
+ }
+ catch {
+ case e =>
+ logger.fatal(e + Utils.stackTrace(e))
+ logger.fatal(topic + " stream " + i + " unexpectedly exited")
}
}
- catch {
- case e =>
- logger.fatal(e + Utils.stackTrace(e))
- logger.fatal(topic + " stream " + i + " unexpectedly exited")
- }
- }
- }, false)
+ }, false)
- for (thread <- threadList)
- thread.start
+ for (thread <- threadList)
+ thread.start()
+ }
+ else
+ logger.info("Not starting consumer threads (mirror topic list is empty)")
}
- def shutdown() = {
- consumerConnector.shutdown
+ def startup() {
+ topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
+ /*
+ * consumer threads are (re-)started upon topic events (which includes an
+ * initial startup event which lists the current topics)
+ */
+ }
+
+ def shutdown() {
+ producer.close()
+ if (consumerConnector != null)
+ consumerConnector.shutdown()
+ if (topicEventWatcher != null)
+ topicEventWatcher.shutdown()
}
}
+
Modified: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (original)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Wed Aug 10 22:32:23 2011
@@ -103,6 +103,8 @@ class KafkaProject(info: ProjectInfo) ex
override def javaCompileOptions = super.javaCompileOptions ++
List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
+
+ override def packageAction = super.packageAction dependsOn (testCompileAction)
}
class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)
Modified: incubator/kafka/trunk/system_test/embedded_consumer/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/README?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/README (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/README Wed Aug 10 22:32:23 2011
@@ -1,8 +1,27 @@
-This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer.
-At the end, the messages produced at the source brokers should match that at the target brokers.
+This test replicates messages from 3 kafka brokers to 2 other kafka brokers
+using the embedded consumer. At the end, the messages produced at the source
+brokers should match that at the target brokers.
To run this test, do
bin/run-test.sh
-The expected output is given in bin/expected.out. There is only 1 thing that's important.
+The expected output is given in bin/expected.out. There is only 1 thing that's
+important.
1. The output should have a line "test passed".
+
+In the event of failure, by default the brokers and zookeepers remain running
+to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
+change this behavior by setting the action_on_fail flag in the script to "exit"
+or "proceed", in which case a snapshot of all the logs and directories is
+placed in the test's base directory.
+
+If you are making any changes that may affect the embedded consumer, it is a
+good idea to run the test in a loop. E.g.:
+
+:>/tmp/embeddedconsumer_test.log
+for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/embeddedconsumer_test.log; done
+tail -F /tmp/embeddedconsumer_test.log
+
+grep -ic passed /tmp/embeddedconsumer_test.log
+grep -ic failed /tmp/embeddedconsumer_test.log
+
Modified: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh Wed Aug 10 22:32:23 2011
@@ -1,79 +1,314 @@
#!/bin/bash
-num_messages=400000
-message_size=400
+readonly num_messages=400000
+readonly message_size=400
+readonly action_on_fail="proceed"
-base_dir=$(dirname $0)/..
+readonly test_start_time="$(date +%s)"
-rm -rf /tmp/zookeeper_source
-rm -rf /tmp/zookeeper_target
-rm -rf /tmp/kafka-source1-logs
-mkdir /tmp/kafka-source1-logs
-mkdir /tmp/kafka-source1-logs/test01-0
-touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka
-rm -rf /tmp/kafka-source2-logs
-mkdir /tmp/kafka-source2-logs
-mkdir /tmp/kafka-source2-logs/test01-0
-touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka
-rm -rf /tmp/kafka-source3-logs
-mkdir /tmp/kafka-source3-logs
-mkdir /tmp/kafka-source3-logs/test01-0
-touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka
-rm -rf /tmp/kafka-target1-logs
-rm -rf /tmp/kafka-target2-logs
-
-echo "start the servers ..."
-$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
-$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log &
-
-sleep 4
-echo "start producing messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 &
-
-echo "wait for consumer to finish consuming ..."
-cur1_offset="-1"
-cur2_offset="-1"
-quit1=0
-quit2=0
-while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ]
-do
- sleep 2
- target1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
- if [ $target1_size -eq $cur1_offset ]
- then
- quit1=1
- fi
- cur1_offset=$target1_size
- target2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
- if [ $target2_size -eq $cur2_offset ]
- then
- quit2=1
- fi
- cur2_offset=$target2_size
-done
+readonly base_dir=$(dirname $0)/..
-sleep 2
-source_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-source_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-source_part2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-target_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-target_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-
-expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size`
-actual_size=`expr $target_part0_size + $target_part1_size`
-if [ $expected_size != $actual_size ]
-then
- echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!"
-else
- echo "test passed"
-fi
+info() {
+ echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+kill_child_processes() {
+ isTopmost=$1
+ curPid=$2
+ childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+ for childPid in $childPids
+ do
+ kill_child_processes 0 $childPid
+ done
+ if [ $isTopmost -eq 0 ]; then
+ kill -15 $curPid 2> /dev/null
+ fi
+}
+
+cleanup() {
+ info "cleaning up"
+
+ pid_zk_source=
+ pid_zk_target=
+ pid_kafka_source1=
+ pid_kafka_source2=
+ pid_kafka_source3=
+ pid_kafka_target1=
+ pid_kafka_target2=
+ pid_producer=
+
+ rm -rf /tmp/zookeeper_source
+ rm -rf /tmp/zookeeper_target
+
+ rm -rf /tmp/kafka-source{1..3}-logs
+ # mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0
+ # touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka
+
+ rm -rf /tmp/kafka-target{1..2}-logs
+}
+
+begin_timer() {
+ t_begin=$(date +%s)
+}
+
+end_timer() {
+ t_end=$(date +%s)
+}
+
+start_zk() {
+ info "starting zookeepers"
+ $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
+ pid_zk_source=$!
+ $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+ pid_zk_target=$!
+}
+
+start_source_servers() {
+ info "starting source cluster"
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
+ pid_kafka_source1=$!
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
+ pid_kafka_source2=$!
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
+ pid_kafka_source3=$!
+}
+
+start_target_servers_for_whitelist_test() {
+ echo "starting mirror cluster"
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
+ pid_kafka_target1=$!
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
+ pid_kafka_target2=$!
+}
+
+start_target_servers_for_blacklist_test() {
+ echo "starting mirror cluster"
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
+ pid_kafka_target1=$!
+ $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
+ pid_kafka_target2=$!
+}
+
+shutdown_servers() {
+ info "stopping producer"
+ if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
+
+ info "shutting down target servers"
+ if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi
+ if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi
+ sleep 2
+
+ info "shutting down source servers"
+ if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi
+ if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi
+ if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi
+
+ info "shutting down zookeeper servers"
+ if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
+ if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi
+}
+
+start_producer() {
+ topic=$1
+ info "start producing messages for topic $topic ..."
+ $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1 > $base_dir/producer_performance.log &
+ pid_producer=$!
+}
+
+# In case the consumer does not consume, the test may exit prematurely (i.e.,
+# shut down the kafka brokers, and ProducerPerformance will start throwing ugly
+# exceptions. So, wait for the producer to finish before shutting down. If it
+# takes too long, the user can just hit Ctrl-c which is trapped to kill child
+# processes.
+# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
+wait_partition_done() {
+ n_tuples=$(($# / 3))
+
+ i=1
+ while (($#)); do
+ kafka_server[i]=$1
+ topic[i]=$2
+ partitionid[i]=$3
+ prev_offset[i]=0
+ info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
+ i=$((i+1))
+ shift 3
+ done
+
+ all_done=0
+
+ # set -x
+ while [[ $all_done != 1 ]]; do
+ sleep 4
+ i=$n_tuples
+ all_done=1
+ for ((i=1; i <= $n_tuples; i++)); do
+ cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
+ if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
+ all_done=0
+ prev_offset[i]=$cur_size
+ fi
+ done
+ done
+
+}
+
+cmp_logs() {
+ topic=$1
+ info "comparing source and target logs for topic $topic"
+ source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+ source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+ source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+ target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+ target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+ if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
+ if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
+ expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size))
+ actual_size=$(($target_part0_size + $target_part1_size))
+ if [ "x$expected_size" != "x$actual_size" ]
+ then
+ info "source size: $expected_size target size: $actual_size"
+ return 1
+ else
+ return 0
+ fi
+}
+
+take_fail_snapshot() {
+ snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
+ mkdir $snapshot_dir
+ for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do
+ if [ -d $dir ]; then
+ cp -r $dir $snapshot_dir
+ fi
+ done
+}
+
+# Usage: process_test_result <result> <action_on_fail>
+# result: last test result
+# action_on_fail: (exit|wait|proceed)
+# ("wait" is useful if you want to troubleshoot using zookeeper)
+process_test_result() {
+ result=$1
+ if [ $1 -eq 0 ]; then
+ info "test passed"
+ else
+ info "test failed"
+ case "$2" in
+ "wait") info "waiting: hit Ctrl-c to quit"
+ wait
+ ;;
+ "exit") shutdown_servers
+ take_fail_snapshot
+ exit $result
+ ;;
+ *) shutdown_servers
+ take_fail_snapshot
+ info "proceeding"
+ ;;
+ esac
+ fi
+}
+
+test_whitelists() {
+ info "### Testing whitelists"
+ snapshot_prefix="whitelist-test"
+
+ cleanup
+ start_zk
+ start_source_servers
+ start_target_servers_for_whitelist_test
+ sleep 4
+
+ begin_timer
+
+ start_producer test01
+ info "waiting for producer to finish producing ..."
+ wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
+
+ info "waiting for consumer to finish consuming ..."
+ wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
+
+ end_timer
+ info "embedded consumer took $((t_end - t_begin)) seconds"
+
+ sleep 2
+
+ cmp_logs test01
+ result=$?
+
+ return $result
+}
+
+test_blacklists() {
+ info "### Testing blacklists"
+ snapshot_prefix="blacklist-test"
+ cleanup
+ start_zk
+ start_source_servers
+ start_target_servers_for_blacklist_test
+ sleep 4
+
+ start_producer test02
+ info "waiting for producer to finish producing test02 ..."
+ wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0
+
+ # start_producer test03
+ # info "waiting for producer to finish producing test03 ..."
+ # wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0
+
+ begin_timer
+
+ start_producer test01
+ info "waiting for producer to finish producing ..."
+ wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
+
+ info "waiting for consumer to finish consuming ..."
+ wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
+
+ end_timer
+
+ info "embedded consumer took $((t_end - t_begin)) seconds"
+
+ sleep 2
+
+ cmp_logs test02
+ result1=$?
+ # cmp_logs test03
+ # result2=$?
+ # if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then
+ if [[ "x$result1" == "x0" ]]; then
+ result=1
+ else
+ cmp_logs test01
+ result=$?
+ fi
+
+ return $result
+}
+
+# main test begins
+
+echo "Test-$test_start_time"
+
+# Ctrl-c trap. Catches INT signal
+trap "shutdown_servers; exit 0" INT
+
+test_whitelists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
-echo "stopping the servers"
-ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
sleep 2
-ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
+
+test_blacklists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+
+exit $result
+
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties Wed Aug 10 22:32:23 2011
@@ -0,0 +1,15 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.blacklist=test02,test03
+
Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties Wed Aug 10 22:32:23 2011
@@ -1,14 +0,0 @@
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
-
-#consumer group id
-groupid=group1
-
-embeddedconsumer.topics=test01:1
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties Wed Aug 10 22:32:23 2011
@@ -0,0 +1,13 @@
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+producer.type=async
+
+# to avoid dropping events if the queue is full, wait indefinitely
+queue.enqueueTimeout.ms=-1
+
Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties Wed Aug 10 22:32:23 2011
@@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4
Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties Wed Aug 10 22:32:23 2011
@@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4
Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties Wed Aug 10 22:32:23 2011
@@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties Wed Aug 10 22:32:23 2011
@@ -0,0 +1,15 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.whitelist=test01:1
+