You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/08/11 00:32:23 UTC

svn commit: r1156393 - in /incubator/kafka/trunk: core/src/main/scala/kafka/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/server/ project/build/ system_test/...

Author: junrao
Date: Wed Aug 10 22:32:23 2011
New Revision: 1156393

URL: http://svn.apache.org/viewvc?rev=1156393&view=rev
Log:
auto-discovery of topics for mirroring; patched by Joel; reviewed by Jun; KAFKA-74

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala
    incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties
    incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties
    incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/trunk/project/build/KafkaProject.scala
    incubator/kafka/trunk/system_test/embedded_consumer/README
    incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
    incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties
    incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties
    incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties
    incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Wed Aug 10 22:32:23 2011
@@ -19,6 +19,7 @@ package kafka
 
 import consumer.ConsumerConfig
 import org.apache.log4j.Logger
+import producer.ProducerConfig
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.Utils
 import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -30,21 +31,23 @@ object Kafka {
     val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
     Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName))
 
-    if(args.length != 1 && args.length != 2) {
-      println("USAGE: java [options] " + classOf[KafkaServer].getSimpleName() + " server.properties [consumer.properties")
+    if (!List(1, 3).contains(args.length)) {
+      println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
       System.exit(1)
     }
   
     try {
-      var kafkaServerStartble: KafkaServerStartable = null
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
-      if (args.length == 2) {
-        val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
-        kafkaServerStartble = new KafkaServerStartable(serverConfig, consumerConfig)
+
+      val kafkaServerStartble = args.length match {
+        case 3 =>
+          val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
+          val producerConfig = new ProducerConfig(Utils.loadProps(args(2)))
+          new KafkaServerStartable(serverConfig, consumerConfig, producerConfig)
+        case 1 =>
+          new KafkaServerStartable(serverConfig)
       }
-      else
-        kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c
       Runtime.getRuntime().addShutdownHook(new Thread() {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Wed Aug 10 22:32:23 2011
@@ -20,7 +20,7 @@ package kafka.consumer
 import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.api.OffsetRequest
-
+import kafka.common.InvalidConfigException
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
@@ -32,7 +32,11 @@ object ConsumerConfig {
   val MaxQueuedChunks = 100
   val AutoOffsetReset = OffsetRequest.SmallestTimeString
   val ConsumerTimeoutMs = -1
-  val EmbeddedConsumerTopics = ""
+  val MirrorTopicsWhitelist = ""
+  val MirrorTopicsBlacklist = ""
+
+  val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
+  val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
 }
 
 class ConsumerConfig(props: Properties) extends ZKConfig(props) {
@@ -80,7 +84,18 @@ class ConsumerConfig(props: Properties) 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
 
-  /* embed a consumer in the broker. e.g., topic1:1,topic2:1 */
-  val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics",
-    EmbeddedConsumerTopics))
+  /** Whitelist of topics for this mirror's embedded consumer to consume. At
+   *  most one of whitelist/blacklist may be specified.
+   *  e.g., topic1:1,topic2:1 */
+  val mirrorTopicsWhitelistMap = Utils.getConsumerTopicMap(Utils.getString(
+    props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist))
+ 
+  /** Topics to skip mirroring. At most one of whitelist/blacklist may be
+   *  specified */
+  val mirrorTopicsBlackList = Utils.getString(
+    props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
+
+  if (mirrorTopicsWhitelistMap.nonEmpty && mirrorTopicsBlackList.nonEmpty)
+      throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
 }
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicEventHandler.scala Wed Aug 10 22:32:23 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+trait TopicEventHandler[T] {
+
+  def handleTopicEvent(allTopics: Seq[T])
+
+}

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Aug 10 22:32:23 2011
@@ -28,6 +28,7 @@ import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.api.OffsetRequest
+import java.util.UUID
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -156,7 +157,10 @@ private[kafka] class ZookeeperConsumerCo
       case Some(consumerId) // for testing only 
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
-      => consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis
+      => val uuid = UUID.randomUUID()
+        consumerUuid = "%s-%d-%s".format(
+          InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+          uuid.getMostSignificantBits().toHexString.substring(0,8))
     }
     val consumerIdString = config.groupId + "_" + consumerUuid
     val topicCount = new TopicCount(consumerIdString, topicCountMap)
@@ -164,6 +168,11 @@ private[kafka] class ZookeeperConsumerCo
     // listener to consumer and partition changes
     val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString)
     registerConsumerInZK(dirs, consumerIdString, topicCount)
+
+    // register listener for session expired event
+    zkClient.subscribeStateChanges(
+      new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
+
     zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
 
     // create a queue per topic per consumer thread
@@ -184,10 +193,6 @@ private[kafka] class ZookeeperConsumerCo
       zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
     }
 
-    // register listener for session expired event
-    zkClient.subscribeStateChanges(
-      new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
-
     // explicitly trigger load balancing for this consumer
     loadBalancerListener.syncedRebalance()
     ret

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Wed Aug 10 22:32:23 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import org.apache.log4j.Logger
+import scala.collection.JavaConversions._
+import kafka.utils.{Utils, ZkUtils, ZKStringSerializer}
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
+    val eventHandler: TopicEventHandler[String]) {
+
+  private val logger = Logger.getLogger(getClass)
+
+  val lock = new Object()
+
+  private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs, ZKStringSerializer)
+
+  startWatchingTopicEvents()
+
+  private def startWatchingTopicEvents() {
+    val topicEventListener = new ZkTopicEventListener
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+
+    zkClient.subscribeStateChanges(
+      new ZkSessionExpireListener(topicEventListener))
+
+    val topics = zkClient.subscribeChildChanges(
+      ZkUtils.BrokerTopicsPath, topicEventListener).toList
+
+    // call to bootstrap topic list
+    topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
+  }
+
+  private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() }
+
+  def shutdown() {
+    lock.synchronized {
+      try {
+        if (zkClient != null) {
+          stopWatchingTopicEvents()
+          zkClient.close()
+          zkClient = null
+        }
+        else
+          logger.warn("Cannot shutdown already shutdown topic event watcher.")
+      }
+      catch {
+        case e =>
+          logger.fatal(e)
+          logger.fatal(Utils.stackTrace(e))
+      }
+    }
+  }
+
+  class ZkTopicEventListener() extends IZkChildListener {
+
+    @throws(classOf[Exception])
+    def handleChildChange(parent: String, children: java.util.List[String]) {
+      lock.synchronized {
+        try {
+          if (zkClient != null) {
+            val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
+            logger.debug("all topics: %s".format(latestTopics))
+
+            eventHandler.handleTopicEvent(latestTopics)
+          }
+        }
+        catch {
+          case e =>
+            logger.fatal(e)
+            logger.fatal(Utils.stackTrace(e))
+        }
+      }
+    }
+
+  }
+
+  class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener)
+    extends IZkStateListener {
+
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) { }
+
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      lock.synchronized {
+        if (zkClient != null) {
+          logger.info(
+            "ZK expired: resubscribing topic event listener to topic registry")
+          zkClient.subscribeChildChanges(
+            ZkUtils.BrokerTopicsPath, topicEventListener)
+        }
+      }
+    }
+  }
+}
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Aug 10 22:32:23 2011
@@ -22,7 +22,7 @@ import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.common.InvalidConfigException
 
-class ProducerConfig(val props: Properties) extends ZKConfig(props) 
+class ProducerConfig(val props: Properties) extends ZKConfig(props)
         with AsyncProducerConfigShared with SyncProducerConfigShared{
 
   /** For bypassing zookeeper based auto partition discovery, use this config   *

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Wed Aug 10 22:32:23 2011
@@ -17,7 +17,7 @@
 
 package kafka.producer.async
 
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.utils.Utils
 import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.log4j.{Level, Logger}
@@ -90,7 +90,28 @@ private[kafka] class AsyncProducer[T](co
     if(cbkHandler != null)
       data = cbkHandler.beforeEnqueue(data)
 
-    val added = queue.offer(data)
+    val added = if (config.enqueueTimeoutMs != 0) {
+      try {
+        if (config.enqueueTimeoutMs < 0) {
+          queue.put(data)
+          true
+        }
+        else {
+          queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+        }
+      }
+      catch {
+        case e: InterruptedException =>
+          val msg = "%s interrupted during enqueue of event %s.".format(
+            getClass.getSimpleName, event.toString)
+          logger.error(msg)
+          throw new AsyncProducerInterruptedException(msg)
+      }
+    }
+    else {
+      queue.offer(data)
+    }
+
     if(cbkHandler != null)
       cbkHandler.afterEnqueue(data, added)
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Wed Aug 10 22:32:23 2011
@@ -33,6 +33,14 @@ trait AsyncProducerConfigShared {
   /** the maximum size of the blocking queue for buffering on the producer */
   val queueSize = Utils.getInt(props, "queue.size", 10000)
 
+  /**
+   * Timeout for event enqueue:
+   * 0: events will be enqueued immediately or dropped if the queue is full
+   * -ve: enqueue will block indefinitely if the queue is full
+   * +ve: enqueue will block up to this many milliseconds if the queue is full
+   */
+  val enqueueTimeoutMs = Utils.getInt(props, "queue.enqueueTimeout.ms", 0)
+
   /** the number of messages batched at the producer */
   val batchSize = Utils.getInt(props, "batch.size", 200)
 

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala Wed Aug 10 22:32:23 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer.async
+
+class AsyncProducerInterruptedException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Wed Aug 10 22:32:23 2011
@@ -17,24 +17,29 @@
 
 package kafka.server
 
+import kafka.utils.Utils
+import kafka.consumer._
+import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import kafka.message.Message
 import org.apache.log4j.Logger
-import kafka.consumer.{Consumer, ConsumerConnector, ConsumerConfig}
-import kafka.utils.{SystemTime, Utils}
-import kafka.api.RequestKeys
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
 
-class KafkaServerStartable(val serverConfig: KafkaConfig, val consumerConfig: ConsumerConfig) {
+import scala.collection.Map
+
+class KafkaServerStartable(val serverConfig: KafkaConfig,
+                           val consumerConfig: ConsumerConfig,
+                           val producerConfig: ProducerConfig) {
   private var server : KafkaServer = null
   private var embeddedConsumer : EmbeddedConsumer = null
 
   init
 
-  def this(serverConfig: KafkaConfig) = this(serverConfig, null)
+  def this(serverConfig: KafkaConfig) = this(serverConfig, null, null)
 
   private def init() {
     server = new KafkaServer(serverConfig)
     if (consumerConfig != null)
-      embeddedConsumer = new EmbeddedConsumer(consumerConfig, server)
+      embeddedConsumer =
+        new EmbeddedConsumer(consumerConfig, producerConfig, server)
   }
 
   def startup() {
@@ -55,43 +60,106 @@ class KafkaServerStartable(val serverCon
 }
 
 class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
-                       private val kafkaServer: KafkaServer) {
-  private val logger = Logger.getLogger(getClass())
-  private val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-  private val topicMessageStreams = consumerConnector.createMessageStreams(consumerConfig.embeddedConsumerTopicMap)
-
-  def startup() = {
-    var threadList = List[Thread]()
-    for ((topic, streamList) <- topicMessageStreams)
-      for (i <- 0 until streamList.length)
-        threadList ::= Utils.newThread("kafka-embedded-consumer-" + topic + "-" + i, new Runnable() {
-          def run() {
-            logger.info("starting consumer thread " + i + " for topic " + topic)
-            val logManager = kafkaServer.getLogManager
-            val stats = kafkaServer.getStats
-            try {
-              for (message <- streamList(i)) {
-                val partition = logManager.chooseRandomPartition(topic)
-                val start = SystemTime.nanoseconds
-                logManager.getOrCreateLog(topic, partition).append(
-                                                          new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                          messages = message))
-                stats.recordRequest(RequestKeys.Produce, SystemTime.nanoseconds - start)
+                       private val producerConfig: ProducerConfig,
+                       private val kafkaServer: KafkaServer) extends TopicEventHandler[String] {
+
+  private val logger = Logger.getLogger(getClass)
+
+  private val blackListTopics =
+    consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
+
+  // mirrorTopics should be accessed by handleTopicEvent only
+  private var mirrorTopics:Seq[String] = List()
+
+  private var consumerConnector: ConsumerConnector = null
+  private var topicEventWatcher:ZookeeperTopicEventWatcher = null
+
+  private val producer = new Producer[Null, Message](producerConfig)
+
+
+  private def isTopicAllowed(topic: String) = {
+    if (consumerConfig.mirrorTopicsWhitelistMap.nonEmpty)
+      consumerConfig.mirrorTopicsWhitelistMap.contains(topic)
+    else
+      !blackListTopics.contains(topic)
+  }
+
+  // TopicEventHandler call-back only
+  @Override
+  def handleTopicEvent(allTopics: Seq[String]) {
+    val newMirrorTopics = allTopics.filter(isTopicAllowed)
+
+    val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
+    if (addedTopics.nonEmpty)
+      logger.info("topic event: added topics = %s".format(addedTopics))
+
+    val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
+    if (deletedTopics.nonEmpty)
+      logger.info("topic event: deleted topics = %s".format(deletedTopics))
+
+    mirrorTopics = newMirrorTopics
+
+    if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
+      logger.info("mirror topics = %s".format(mirrorTopics))
+      startNewConsumerThreads(makeTopicMap(mirrorTopics))
+    }
+  }
+
+  private def makeTopicMap(mirrorTopics: Seq[String]) = {
+    if (mirrorTopics.nonEmpty)
+      Utils.getConsumerTopicMap(mirrorTopics.mkString("", ":1,", ":1"))
+    else
+      Utils.getConsumerTopicMap("")
+  }
+
+  private def startNewConsumerThreads(topicMap: Map[String, Int]) {
+    if (topicMap.nonEmpty) {
+      if (consumerConnector != null)
+        consumerConnector.shutdown()
+      consumerConnector = Consumer.create(consumerConfig)
+      val topicMessageStreams =  consumerConnector.createMessageStreams(topicMap)
+      var threadList = List[Thread]()
+      for ((topic, streamList) <- topicMessageStreams)
+        for (i <- 0 until streamList.length)
+          threadList ::= Utils.newThread("kafka-embedded-consumer-%s-%d".format(topic, i), new Runnable() {
+            def run() {
+              logger.info("Starting consumer thread %d for topic %s".format(i, topic))
+
+              try {
+                for (message <- streamList(i)) {
+                  val pd = new ProducerData[Null, Message](topic, message)
+                  producer.send(pd)
+                }
+              }
+              catch {
+                case e =>
+                  logger.fatal(e + Utils.stackTrace(e))
+                  logger.fatal(topic + " stream " + i + " unexpectedly exited")
               }
             }
-            catch {
-              case e =>
-                logger.fatal(e + Utils.stackTrace(e))
-                logger.fatal(topic + " stream " + i + " unexpectedly exited")
-            }
-          }
-        }, false)
+          }, false)
 
-    for (thread <- threadList)
-      thread.start
+      for (thread <- threadList)
+        thread.start()
+    }
+    else
+      logger.info("Not starting consumer threads (mirror topic list is empty)")
   }
 
-  def shutdown() = {
-    consumerConnector.shutdown
+  def startup() {
+    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
+    /*
+     * consumer threads are (re-)started upon topic events (which includes an
+     * initial startup event which lists the current topics)
+     */
+   }
+
+  def shutdown() {
+    producer.close()
+    if (consumerConnector != null)
+      consumerConnector.shutdown()
+    if (topicEventWatcher != null)
+      topicEventWatcher.shutdown()
   }
 }
+

Modified: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (original)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Wed Aug 10 22:32:23 2011
@@ -103,6 +103,8 @@ class KafkaProject(info: ProjectInfo) ex
 
     override def javaCompileOptions = super.javaCompileOptions ++
       List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
+
+    override def packageAction = super.packageAction dependsOn (testCompileAction)
   }
 
   class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)

Modified: incubator/kafka/trunk/system_test/embedded_consumer/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/README?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/README (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/README Wed Aug 10 22:32:23 2011
@@ -1,8 +1,27 @@
-This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer.
-At the end, the messages produced at the source brokers should match that at the target brokers.
+This test replicates messages from 3 kafka brokers to 2 other kafka brokers
+using the embedded consumer.  At the end, the messages produced at the source
+brokers should match that at the target brokers.
 
 To run this test, do
 bin/run-test.sh
 
-The expected output is given in bin/expected.out. There is only 1 thing that's important.
+The expected output is given in bin/expected.out. There is only 1 thing that's
+important.
 1. The output should have a line "test passed".
+
+In the event of failure, by default the brokers and zookeepers remain running
+to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
+change this behavior by setting the action_on_fail flag in the script to "exit"
+or "proceed", in which case a snapshot of all the logs and directories is
+placed in the test's base directory.
+
+If you are making any changes that may affect the embedded consumer, it is a
+good idea to run the test in a loop. E.g.:
+
+:>/tmp/embeddedconsumer_test.log
+for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/embeddedconsumer_test.log; done
+tail -F /tmp/embeddedconsumer_test.log
+
+grep -ic passed /tmp/embeddedconsumer_test.log
+grep -ic failed /tmp/embeddedconsumer_test.log
+

Modified: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh Wed Aug 10 22:32:23 2011
@@ -1,79 +1,314 @@
 #!/bin/bash
 
-num_messages=400000
-message_size=400
+readonly num_messages=400000
+readonly message_size=400
+readonly action_on_fail="proceed"
 
-base_dir=$(dirname $0)/..
+readonly test_start_time="$(date +%s)"
 
-rm -rf /tmp/zookeeper_source
-rm -rf /tmp/zookeeper_target
-rm -rf /tmp/kafka-source1-logs
-mkdir /tmp/kafka-source1-logs
-mkdir /tmp/kafka-source1-logs/test01-0
-touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka
-rm -rf /tmp/kafka-source2-logs
-mkdir /tmp/kafka-source2-logs
-mkdir /tmp/kafka-source2-logs/test01-0
-touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka
-rm -rf /tmp/kafka-source3-logs
-mkdir /tmp/kafka-source3-logs
-mkdir /tmp/kafka-source3-logs/test01-0
-touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka
-rm -rf /tmp/kafka-target1-logs
-rm -rf /tmp/kafka-target2-logs
-
-echo "start the servers ..."
-$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
-$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log &
-$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log &
-
-sleep 4
-echo "start producing messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 &
-
-echo "wait for consumer to finish consuming ..."
-cur1_offset="-1"
-cur2_offset="-1"
-quit1=0
-quit2=0
-while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ]
-do
-  sleep 2
-  target1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-  if [ $target1_size -eq $cur1_offset ]
-  then
-    quit1=1
-  fi
-  cur1_offset=$target1_size
-  target2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-  if [ $target2_size -eq $cur2_offset ]
-  then
-    quit2=1
-  fi
-  cur2_offset=$target2_size
-done
+readonly base_dir=$(dirname $0)/..
 
-sleep 2
-source_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-source_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-source_part2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-target_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-target_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
-
-expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size`
-actual_size=`expr $target_part0_size + $target_part1_size`
-if [ $expected_size != $actual_size ]
-then
-   echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!"
-else
-   echo "test passed"
-fi
+info() {
+    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+kill_child_processes() {
+    isTopmost=$1
+    curPid=$2
+    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+    for childPid in $childPids
+    do
+        kill_child_processes 0 $childPid
+    done
+    if [ $isTopmost -eq 0 ]; then
+        kill -15 $curPid 2> /dev/null
+    fi
+}
+
+cleanup() {
+    info "cleaning up"
+
+    pid_zk_source=
+    pid_zk_target=
+    pid_kafka_source1=
+    pid_kafka_source2=
+    pid_kafka_source3=
+    pid_kafka_target1=
+    pid_kafka_target2=
+    pid_producer=
+
+    rm -rf /tmp/zookeeper_source
+    rm -rf /tmp/zookeeper_target
+
+    rm -rf /tmp/kafka-source{1..3}-logs
+    # mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0
+    # touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka
+
+    rm -rf /tmp/kafka-target{1..2}-logs
+}
+
+begin_timer() {
+    t_begin=$(date +%s)
+}
+
+end_timer() {
+    t_end=$(date +%s)
+}
+
+start_zk() {
+    info "starting zookeepers"
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
+    pid_zk_source=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+    pid_zk_target=$!
+}
+
+start_source_servers() {
+    info "starting source cluster"
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
+    pid_kafka_source1=$!
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
+    pid_kafka_source2=$!
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
+    pid_kafka_source3=$!
+}
+
+start_target_servers_for_whitelist_test() {
+    echo "starting mirror cluster"
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
+    pid_kafka_target1=$!
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
+    pid_kafka_target2=$!
+}
+
+start_target_servers_for_blacklist_test() {
+    echo "starting mirror cluster"
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
+    pid_kafka_target1=$!
+    $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
+    pid_kafka_target2=$!
+}
+
+shutdown_servers() {
+    info "stopping producer"
+    if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
+
+    info "shutting down target servers"
+    if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi
+    if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi
+    sleep 2
+
+    info "shutting down source servers"
+    if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi
+    if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi
+    if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi
+
+    info "shutting down zookeeper servers"
+    if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
+    if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi
+}
+
+start_producer() {
+    topic=$1
+    info "start producing messages for topic $topic ..."
+    $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1 > $base_dir/producer_performance.log &
+    pid_producer=$!
+}
+
+# In case the consumer does not consume, the test may exit prematurely (i.e.,
+# shut down the kafka brokers, and ProducerPerformance will start throwing ugly
+# exceptions. So, wait for the producer to finish before shutting down. If it
+# takes too long, the user can just hit Ctrl-c which is trapped to kill child
+# processes.
+# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
+wait_partition_done() {
+    n_tuples=$(($# / 3))
+
+    i=1
+    while (($#)); do
+        kafka_server[i]=$1
+        topic[i]=$2
+        partitionid[i]=$3
+        prev_offset[i]=0
+        info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
+        i=$((i+1))
+        shift 3
+    done
+
+    all_done=0
+
+    # set -x
+    while [[ $all_done != 1 ]]; do
+        sleep 4
+        i=$n_tuples
+        all_done=1
+        for ((i=1; i <= $n_tuples; i++)); do
+            cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
+            if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
+                all_done=0
+                prev_offset[i]=$cur_size
+            fi
+        done
+    done
+
+}
+
+cmp_logs() {
+    topic=$1
+    info "comparing source and target logs for topic $topic"
+    source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
+    if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
+    expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size))
+    actual_size=$(($target_part0_size + $target_part1_size))
+    if [ "x$expected_size" != "x$actual_size" ]
+    then
+        info "source size: $expected_size target size: $actual_size"
+        return 1
+    else
+        return 0
+    fi
+}
+
+take_fail_snapshot() {
+    snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
+    mkdir $snapshot_dir
+    for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do
+        if [ -d $dir ]; then
+            cp -r $dir $snapshot_dir
+        fi
+    done
+}
+
+# Usage: process_test_result <result> <action_on_fail>
+# result: last test result
+# action_on_fail: (exit|wait|proceed)
+# ("wait" is useful if you want to troubleshoot using zookeeper)
+process_test_result() {
+    result=$1
+    if [ $1 -eq 0 ]; then
+        info "test passed"
+    else
+        info "test failed"
+        case "$2" in
+            "wait") info "waiting: hit Ctrl-c to quit"
+                wait
+                ;;
+            "exit") shutdown_servers
+                take_fail_snapshot
+                exit $result
+                ;;
+            *) shutdown_servers
+                take_fail_snapshot
+                info "proceeding"
+                ;;
+        esac
+    fi
+}
+
+test_whitelists() {
+    info "### Testing whitelists"
+    snapshot_prefix="whitelist-test"
+
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers_for_whitelist_test
+    sleep 4
+
+    begin_timer
+
+    start_producer test01
+    info "waiting for producer to finish producing ..."
+    wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
+
+    info "waiting for consumer to finish consuming ..."
+    wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
+
+    end_timer
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    cmp_logs test01
+    result=$?
+
+    return $result
+}
+
+test_blacklists() {
+    info "### Testing blacklists"
+    snapshot_prefix="blacklist-test"
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers_for_blacklist_test
+    sleep 4
+
+    start_producer test02
+    info "waiting for producer to finish producing test02 ..."
+    wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0
+
+    # start_producer test03
+    # info "waiting for producer to finish producing test03 ..."
+    # wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0
+
+    begin_timer
+
+    start_producer test01
+    info "waiting for producer to finish producing ..."
+    wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
+
+    info "waiting for consumer to finish consuming ..."
+    wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
+
+    end_timer
+
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    cmp_logs test02
+    result1=$?
+    # cmp_logs test03
+    # result2=$?
+    # if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then
+    if [[ "x$result1" == "x0" ]]; then
+        result=1
+    else
+        cmp_logs test01
+        result=$?
+    fi
+
+    return $result
+}
+
+# main test begins
+
+echo "Test-$test_start_time"
+
+# Ctrl-c trap. Catches INT signal
+trap "shutdown_servers; exit 0" INT
+
+test_whitelists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
 
-echo "stopping the servers"
-ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
 sleep 2
-ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
+
+test_blacklists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+
+exit $result
+

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/blacklisttest.consumer.properties Wed Aug 10 22:32:23 2011
@@ -0,0 +1,15 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.blacklist=test02,test03
+

Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties Wed Aug 10 22:32:23 2011
@@ -1,14 +0,0 @@
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
-
-#consumer group id
-groupid=group1
-
-embeddedconsumer.topics=test01:1

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/mirror_producer.properties Wed Aug 10 22:32:23 2011
@@ -0,0 +1,13 @@
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+producer.type=async
+
+# to avoid dropping events if the queue is full, wait indefinitely
+queue.enqueueTimeout.ms=-1
+

Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties Wed Aug 10 22:32:23 2011
@@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4

Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties Wed Aug 10 22:32:23 2011
@@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4

Modified: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties?rev=1156393&r1=1156392&r2=1156393&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties (original)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties Wed Aug 10 22:32:23 2011
@@ -60,5 +60,3 @@ log.default.flush.interval.ms=1000
 # time based topic flasher time rate in ms
 log.default.flush.scheduler.interval.ms=1000
 
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties?rev=1156393&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/whitelisttest.consumer.properties Wed Aug 10 22:32:23 2011
@@ -0,0 +1,15 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.whitelist=test01:1
+