You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2012/05/31 03:51:27 UTC

svn commit: r1344526 [2/3] - in /incubator/kafka/branches/0.8: ./ clients/cpp/src/ contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/ contrib/hadoop-producer/src/main/java/kaf...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu May 31 01:51:23 2012
@@ -19,6 +19,7 @@ package kafka.consumer
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
+import locks.ReentrantLock
 import scala.collection._
 import kafka.cluster._
 import kafka.utils._
@@ -33,6 +34,7 @@ import java.lang.IllegalStateException
 import kafka.utils.ZkUtils._
 import kafka.common.{NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
 
+
 /**
  * This class handles the consumers interaction with zookeeper
  *
@@ -85,16 +87,37 @@ trait ZookeeperConsumerConnectorMBean {
 
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
-  extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
-
+        extends ConsumerConnector with ZookeeperConsumerConnectorMBean
+        with Logging {
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[Fetcher] = None
   private var zkClient: ZkClient = null
-  private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
-  // queues : (topic,consumerThreadId) -> queue
-  private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+  private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+  // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
+  private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+  private val messageStreamCreated = new AtomicBoolean(false)
+
+  private var sessionExpirationListener: ZKSessionExpireListener = null
+  private var loadBalancerListener: ZKRebalancerListener = null
+
+  private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+
+  val consumerIdString = {
+    var consumerUuid : String = null
+    config.consumerId match {
+      case Some(consumerId) // for testing only
+      => consumerUuid = consumerId
+      case None // generate unique consumerId automatically
+      => val uuid = UUID.randomUUID()
+      consumerUuid = "%s-%d-%s".format(
+        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+        uuid.getMostSignificantBits().toHexString.substring(0,8))
+    }
+    config.groupId + "_" + consumerUuid
+  }
+  this.logIdent = consumerIdString + " "
 
   connectZk()
   createFetcher()
@@ -106,10 +129,18 @@ private[kafka] class ZookeeperConsumerCo
   def this(config: ConsumerConfig) = this(config, true)
 
   def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
-      : Map[String,List[KafkaMessageStream[T]]] = {
+      : Map[String,List[KafkaStream[T]]] = {
+    if (messageStreamCreated.getAndSet(true))
+      throw new RuntimeException(this.getClass.getSimpleName +
+                                   " can create message streams at most once")
     consume(topicCountMap, decoder)
   }
 
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
+    val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
+    wildcardStreamsHandler.streams
+  }
+
   private def createFetcher() {
     if (enableFetcher)
       fetcher = Some(new Fetcher(config, zkClient))
@@ -124,6 +155,9 @@ private[kafka] class ZookeeperConsumerCo
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("ZKConsumerConnector shutting down")
+
+      if (wildcardTopicWatcher != null)
+        wildcardTopicWatcher.shutdown()
       try {
         scheduler.shutdownNow()
         fetcher match {
@@ -146,59 +180,30 @@ private[kafka] class ZookeeperConsumerCo
   }
 
   def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
-      : Map[String,List[KafkaMessageStream[T]]] = {
+      : Map[String,List[KafkaStream[T]]] = {
     debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
 
-    val dirs = new ZKGroupDirs(config.groupId)
-    var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
-
-    var consumerUuid : String = null
-    config.consumerId match {
-      case Some(consumerId) => // for testing only
-        consumerUuid = consumerId
-      case None => // generate unique consumerId automatically
-        val uuid = UUID.randomUUID()
-        consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName,
-                                          System.currentTimeMillis,
-                                          uuid.getMostSignificantBits().toHexString.substring(0,8) )
-    }
-    val consumerIdString = config.groupId + "_" + consumerUuid
-    val topicCount = new TopicCount(consumerIdString, topicCountMap)
-
-    // create a queue per topic per consumer thread
-    val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
-    for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
-      var streamList: List[KafkaMessageStream[T]] = Nil
-      for (threadId <- threadIdSet) {
-        val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
-        queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
-      }
-      ret += (topic -> streamList)
-      debug("adding topic " + topic + " and stream to map..")
-    }
+    val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
 
-    // listener to consumer and partition changes
-    val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString, ret)
-    registerConsumerInZK(dirs, consumerIdString, topicCount)
+    val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
 
-    // register listener for session expired event
-    zkClient.subscribeStateChanges(
-      new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener))
-
-    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+    // make a list of (queue,stream) pairs, one pair for each threadId
+    val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
+      threadIdSet.map(_ => {
+        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        val stream = new KafkaStream[T](
+          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+        (queue, stream)
+      })
+    ).flatten.toList
 
-    ret.foreach { topicAndStreams =>
-      // register on broker partition path changes
-      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
-      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
-    }
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, topicCount)
+    reinitializeConsumer(topicCount, queuesAndStreams)
 
-    // explicitly trigger load balancing for this consumer
-    loadBalancerListener.syncedRebalance()
-    ret
+    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
   }
 
   // this API is used by unit tests only
@@ -206,12 +211,14 @@ private[kafka] class ZookeeperConsumerCo
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+    createEphemeralPathExpectConflict(zkClient,
+                                      dirs.consumerRegistryDir + "/" + consumerIdString,
+                                      topicCount.dbString)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShutdownToAllQueues() = {
-    for (queue <- queues.values) {
+    for (queue <- topicThreadIdAndQueues.values) {
       debug("Clearing up queue")
       queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)
@@ -330,10 +337,10 @@ private[kafka] class ZookeeperConsumerCo
     producedOffset
   }
 
-  class ZKSessionExpireListener[T](val dirs: ZKGroupDirs,
+  class ZKSessionExpireListener(val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
                                  val topicCount: TopicCount,
-                                 val loadBalancerListener: ZKRebalancerListener[T])
+                                 val loadBalancerListener: ZKRebalancerListener)
     extends IZkStateListener {
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
@@ -355,10 +362,10 @@ private[kafka] class ZookeeperConsumerCo
        *  consumer in the consumer registry and trigger a rebalance.
        */
       info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
-      loadBalancerListener.resetState
+      loadBalancerListener.resetState()
       registerConsumerInZK(dirs, consumerIdString, topicCount)
       // explicitly trigger load balancing for this consumer
-      loadBalancerListener.syncedRebalance
+      loadBalancerListener.syncedRebalance()
 
       // There is no need to resubscribe to child and state changes.
       // The child change watchers will be set inside rebalance when we read the children list.
@@ -366,45 +373,69 @@ private[kafka] class ZookeeperConsumerCo
 
   }
 
-  class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
-                                kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
+  class ZKRebalancerListener(val group: String, val consumerIdString: String,
+                             val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
     extends IZkChildListener {
-    private val dirs = new ZKGroupDirs(group)
     private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]()
     private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+    private var isWatcherTriggered = false
+    private val lock = new ReentrantLock
+    private val cond = lock.newCondition()
+    private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
+      override def run() {
+        info("starting watcher executor thread for consumer " + consumerIdString)
+        var doRebalance = false
+        while (!isShuttingDown.get) {
+          try {
+            lock.lock()
+            try {
+              if (!isWatcherTriggered)
+                cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
+            } finally {
+              doRebalance = isWatcherTriggered
+              isWatcherTriggered = false
+              lock.unlock()
+            }
+            if (doRebalance)
+              syncedRebalance
+          } catch {
+            case t => error("error during syncedRebalance", t)
+          }
+        }
+        info("stopping watcher executor thread for consumer " + consumerIdString)
+      }
+    }
+    watcherExecutorThread.start()
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
-      syncedRebalance
+      lock.lock()
+      try {
+        isWatcherTriggered = true
+        cond.signalAll()
+      } finally {
+        lock.unlock()
+      }
     }
 
-    private def releasePartitionOwnership()= {
-      info("Releasing partition ownership")
-      for ((topic, infos) <- topicRegistry) {
-        for(partition <- infos.keys) {
-          val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString)
-          deletePath(zkClient, partitionOwnerPath)
-          debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath)
-        }
-      }
+    private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
+      val topicDirs = new ZKGroupTopicDirs(group, topic)
+      val znode = topicDirs.consumerOwnerDir + "/" + partition
+      deletePath(zkClient, znode)
+      debug("Consumer " + consumerIdString + " releasing " + znode)
     }
 
-    private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
-                                    newPartMap: Map[String, Seq[String]],
-                                    oldPartMap: Map[String, Seq[String]],
-                                    newConsumerMap: Map[String,List[String]],
-                                    oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
-      var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
-      for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
-        if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
-          relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
-      relevantTopicThreadIdsMap
+    private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= {
+      info("Releasing partition ownership")
+      for ((topic, infos) <- localTopicRegistry) {
+        for(partition <- infos.keys)
+          deletePartitionOwnershipFromZK(topic, partition.toString)
+        localTopicRegistry.remove(topic)
+      }
     }
 
     def resetState() {
       topicRegistry.clear
-      oldConsumersPerTopicMap.clear
-      oldPartitionsPerTopicMap.clear
     }
 
     def syncedRebalance() {
@@ -422,8 +453,6 @@ private[kafka] class ZookeeperConsumerCo
                * the value of a child. Just let this go since another rebalance will be triggered.
                **/
               info("exception during rebalance ", e)
-              /* Explicitly make sure another rebalancing attempt will get triggered. */
-              done = false
           }
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
           if (done) {
@@ -432,15 +461,9 @@ private[kafka] class ZookeeperConsumerCo
               /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                * clear the cache */
               info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
-              oldConsumersPerTopicMap.clear()
-              oldPartitionsPerTopicMap.clear()
           }
-          // commit offsets
-          commitOffsets()
           // stop all fetchers and clear all the queues to avoid data duplication
-          closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
-          // release all partitions, reset state and retry
-          releasePartitionOwnership()
+          closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
           Thread.sleep(config.rebalanceBackoffMs)
         }
       }
@@ -449,17 +472,9 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def rebalance(cluster: Cluster): Boolean = {
-      val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+      val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
-      val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
-      if (relevantTopicThreadIdsMap.size <= 0) {
-        info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
-          format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
-        debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
-        debug("Consumers per topic cache " + oldConsumersPerTopicMap)
-        return true
-      }
 
       /**
        * fetchers must be stopped to avoid data duplication, since if the current
@@ -467,14 +482,15 @@ private[kafka] class ZookeeperConsumerCo
        * But if we don't stop the fetchers first, this consumer would continue returning data for released
        * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
        */
-      closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
+      closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
 
-      releasePartitionOwnership()
+      releasePartitionOwnership(topicRegistry)
 
       var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
-      for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
-        topicRegistry.remove(topic)
-        topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
+      var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+
+      for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
+        currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
@@ -502,11 +518,9 @@ private[kafka] class ZookeeperConsumerCo
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
-              val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
-              if (!ownPartition)
-                return false
-              else // record the partition ownership decision
-                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+              addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
+              // record the partition ownership decision
+              partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
           }
         }
@@ -520,21 +534,21 @@ private[kafka] class ZookeeperConsumerCo
         info("Updating the cache")
         debug("Partitions per topic cache " + partitionsPerTopicMap)
         debug("Consumers per topic cache " + consumersPerTopicMap)
-        oldPartitionsPerTopicMap = partitionsPerTopicMap
-        oldConsumersPerTopicMap = consumersPerTopicMap
-        updateFetcher(cluster, kafkaMessageStreams)
+        topicRegistry = currentTopicRegistry
+        updateFetcher(cluster)
         true
-      } else
+      } else {
         false
+      }
     }
 
     private def closeFetchersForQueues(cluster: Cluster,
-                                       kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                                       messageStreams: Map[String,List[KafkaStream[_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) => f.stopConnectionsToAllBrokers
-        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
+        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
         info("Committing all offsets after clearing the fetcher queues")
         /**
         * here, we need to commit offsets before stopping the consumer from returning any more messages
@@ -549,16 +563,15 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
                               relevantTopicThreadIdsMap: Map[String, Set[String]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
       // after this rebalancing attempt
-      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
-      closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+      val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+      closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
     }
 
-    private def updateFetcher[T](cluster: Cluster,
-                                 kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+    private def updateFetcher(cluster: Cluster) {
       // update partitions for fetcher
       var allPartitionInfos : List[PartitionTopicInfo] = Nil
       for (partitionInfos <- topicRegistry.values)
@@ -569,33 +582,13 @@ private[kafka] class ZookeeperConsumerCo
 
       fetcher match {
         case Some(f) =>
-          f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams)
+          f.startConnections(allPartitionInfos, cluster)
         case None =>
       }
     }
 
-    private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
-                                 topic: String, consumerThreadId: String) : Boolean = {
-      val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-      // check if some other consumer owns this partition at this time
-      val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
-      if(currentPartitionOwner != null) {
-        if(currentPartitionOwner.equals(consumerThreadId)) {
-          info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
-          addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
-          true
-        }
-        else {
-          info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
-          false
-        }
-      } else {
-        addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
-        true
-      }
-    }
-
     private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+      var successfullyOwnedPartitions : List[(String, String)] = Nil
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
         val topic = partitionOwner._1._1
         val partition = partitionOwner._1._2
@@ -604,6 +597,7 @@ private[kafka] class ZookeeperConsumerCo
         try {
           createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+          successfullyOwnedPartitions ::= (topic, partition)
           true
         } catch {
           case e: ZkNodeExistsException =>
@@ -613,14 +607,20 @@ private[kafka] class ZookeeperConsumerCo
           case e2 => throw e2
         }
       }
-      val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
-      if(success > 0) false       /* even if one of the partition ownership attempt has failed, return false */
+      val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
+      /* even if one of the partition ownership attempt has failed, return false */
+      if(hasPartitionOwnershipFailed > 0) {
+        // remove all paths that we have owned in ZK
+        successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2))
+        false
+      }
       else true
     }
 
-    private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partition: String,
+    private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
+                                      topicDirs: ZKGroupTopicDirs, partition: String,
                                       topic: String, consumerThreadId: String) {
-      val partTopicInfoMap = topicRegistry.get(topic)
+      val partTopicInfoMap = currentTopicRegistry.get(topic)
 
       // find the leader for this partition
       val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt)
@@ -646,8 +646,7 @@ private[kafka] class ZookeeperConsumerCo
         }
       else
         offset = offsetString.toLong
-
-      val queue = queues.get((topic, consumerThreadId))
+      val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
@@ -661,5 +660,155 @@ private[kafka] class ZookeeperConsumerCo
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }
+
+  private def reinitializeConsumer[T](
+      topicCount: TopicCount,
+      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+
+    val dirs = new ZKGroupDirs(config.groupId)
+
+    // listener to consumer and partition changes
+    if (loadBalancerListener == null) {
+      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
+      loadBalancerListener = new ZKRebalancerListener(
+        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
+    }
+
+    // register listener for session expired event
+    if (sessionExpirationListener == null)
+      sessionExpirationListener = new ZKSessionExpireListener(
+        dirs, consumerIdString, topicCount, loadBalancerListener)
+
+    val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
+
+    // map of {topic -> Set(thread-1, thread-2, ...)}
+    val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+      topicCount.getConsumerThreadIdsPerTopic
+
+    /*
+     * This usage of map flatten breaks up consumerThreadIdsPerTopic into
+     * a set of (topic, thread-id) pairs that we then use to construct
+     * the updated (topic, thread-id) -> queues map
+     */
+    implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
+
+    // iterator over (topic, thread-id) tuples
+    val topicThreadIds: Iterable[(String, String)] =
+      consumerThreadIdsPerTopic.flatten
+
+    // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
+    val threadQueueStreamPairs = topicCount match {
+      case wildTopicCount: WildcardTopicCount =>
+        for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
+      case statTopicCount: StaticTopicCount => {
+        require(topicThreadIds.size == queuesAndStreams.size,
+          "Mismatch between thread ID count (%d) and queue count (%d)".format(
+          topicThreadIds.size, queuesAndStreams.size))
+        topicThreadIds.zip(queuesAndStreams)
+      }
+    }
+
+    threadQueueStreamPairs.foreach(e => {
+      val topicThreadId = e._1
+      val q = e._2._1
+      topicThreadIdAndQueues.put(topicThreadId, q)
+    })
+
+    val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
+    groupedByTopic.foreach(e => {
+      val topic = e._1
+      val streams = e._2.map(_._2._2).toList
+      topicStreamsMap += (topic -> streams)
+      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
+    })
+
+    // listener to consumer and partition changes
+    zkClient.subscribeStateChanges(sessionExpirationListener)
+
+    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+    topicStreamsMap.foreach { topicAndStreams =>
+      // register on broker partition path changes
+      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
+      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+    }
+
+    // explicitly trigger load balancing for this consumer
+    loadBalancerListener.syncedRebalance()
+  }
+
+  class WildcardStreamsHandler[T](topicFilter: TopicFilter,
+                                  numStreams: Int,
+                                  decoder: Decoder[T])
+                                extends TopicEventHandler[String] {
+
+    if (messageStreamCreated.getAndSet(true))
+      throw new RuntimeException("Each consumer connector can create " +
+        "message streams by filter at most once.")
+
+    private val wildcardQueuesAndStreams = (1 to numStreams)
+      .map(e => {
+        val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        val stream = new KafkaStream[T](
+          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+        (queue, stream)
+    }).toList
+
+     // bootstrap with existing topics
+    private var wildcardTopics =
+      getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+        .filter(topicFilter.isTopicAllowed)
+
+    private val wildcardTopicCount = TopicCount.constructTopicCount(
+      consumerIdString, topicFilter, numStreams, zkClient)
+
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
+    reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+
+    if (!topicFilter.requiresTopicEventWatcher) {
+      info("Not creating event watcher for trivial whitelist " + topicFilter)
+    }
+    else {
+      info("Creating topic event watcher for whitelist " + topicFilter)
+      wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
+
+      /*
+       * Topic events will trigger subsequent synced rebalances. Also, the
+       * consumer will get registered only after an allowed topic becomes
+       * available.
+       */
+    }
+
+    def handleTopicEvent(allTopics: Seq[String]) {
+      debug("Handling topic event")
+
+      val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
+
+      val addedTopics = updatedTopics filterNot (wildcardTopics contains)
+      if (addedTopics.nonEmpty)
+        info("Topic event: added topics = %s"
+                             .format(addedTopics))
+
+      /*
+       * TODO: Deleted topics are interesting (and will not be a concern until
+       * 0.8 release). We may need to remove these topics from the rebalance
+       * listener's map in reinitializeConsumer.
+       */
+      val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
+      if (deletedTopics.nonEmpty)
+        info("Topic event: deleted topics = %s"
+                             .format(deletedTopics))
+
+      wildcardTopics = updatedTopics
+      info("Topics to consume = %s".format(wildcardTopics))
+
+      if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
+        reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+    }
+
+    def streams: Seq[KafkaStream[T]] =
+      wildcardQueuesAndStreams.map(_._2)
+  }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Thu May 31 01:51:23 2012
@@ -21,11 +21,9 @@ import scala.collection.JavaConversions.
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.KafkaServerStartable
-import kafka.common.ConsumerRebalanceFailedException
 
 class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
-    val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
+    val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
 
@@ -35,7 +33,7 @@ class ZookeeperTopicEventWatcher(val con
   startWatchingTopicEvents()
 
   private def startWatchingTopicEvents() {
-    val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
+    val topicEventListener = new ZkTopicEventListener()
     ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
 
     zkClient.subscribeStateChanges(
@@ -52,6 +50,7 @@ class ZookeeperTopicEventWatcher(val con
 
   def shutdown() {
     lock.synchronized {
+      info("Shutting down topic event watcher.")
       if (zkClient != null) {
         stopWatchingTopicEvents()
         zkClient.close()
@@ -62,7 +61,7 @@ class ZookeeperTopicEventWatcher(val con
     }
   }
 
-  class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
+  class ZkTopicEventListener extends IZkChildListener {
 
     @throws(classOf[Exception])
     def handleChildChange(parent: String, children: java.util.List[String]) {
@@ -76,11 +75,8 @@ class ZookeeperTopicEventWatcher(val con
           }
         }
         catch {
-          case e: ConsumerRebalanceFailedException =>
-            fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
-            kafkaServerStartable.shutdown()
           case e =>
-            error("error in handling child changes in embedded consumer", e)
+            error("error in handling child changes", e)
         }
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Thu May 31 01:51:23 2012
@@ -17,34 +17,53 @@
 
 package kafka.javaapi.consumer;
 
-import kafka.consumer.KafkaMessageStream;
-import kafka.message.Message;
-import kafka.serializer.Decoder;
 
 import java.util.List;
 import java.util.Map;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.TopicFilter;
+import kafka.message.Message;
+import kafka.serializer.Decoder;
 
 public interface ConsumerConnector {
-    /**
-     *  Create a list of MessageStreams of type T for each topic.
-     *
-     *  @param topicCountMap  a map of (topic, #streams) pair
-     *  @param decoder a decoder that converts from Message to T
-     *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
-     *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
-     */
-    public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
-            Map<String, Integer> topicCountMap, Decoder<T> decoder);
-    public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
-            Map<String, Integer> topicCountMap);
-
-    /**
-     *  Commit the offsets of all broker partitions connected by this connector.
-     */
-    public void commitOffsets();
-
-    /**
-     *  Shut down the connector
-     */
-    public void shutdown();
+  /**
+   *  Create a list of MessageStreams of type T for each topic.
+   *
+   *  @param topicCountMap  a map of (topic, #streams) pair
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a map of (topic, list of  KafkaStream) pairs.
+   *          The number of items in the list is #streams. Each stream supports
+   *          an iterator over message/metadata pairs.
+   */
+  public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
+      Map<String, Integer> topicCountMap, Decoder<T> decoder);
+  public Map<String, List<KafkaStream<Message>>> createMessageStreams(
+      Map<String, Integer> topicCountMap);
+
+  /**
+   *  Create a list of MessageAndTopicStreams containing messages of type T.
+   *
+   *  @param topicFilter a TopicFilter that specifies which topics to
+   *                    subscribe to (encapsulates a whitelist or a blacklist).
+   *  @param numStreams the number of message streams to return.
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a list of KafkaStream. Each stream supports an
+   *          iterator over its MessageAndMetadata elements.
+   */
+  public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
+  public List<KafkaStream<Message>> createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
+  public List<KafkaStream<Message>> createMessageStreamsByFilter(
+      TopicFilter topicFilter);
+
+  /**
+   *  Commit the offsets of all broker partitions connected by this connector.
+   */
+  public void commitOffsets();
+
+  /**
+   *  Shut down the connector
+   */
+  public void shutdown();
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Thu May 31 01:51:23 2012
@@ -16,9 +16,11 @@
  */
 package kafka.javaapi.consumer
 
-import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import kafka.message.Message
 import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.consumer._
+import scala.collection.JavaConversions.asList
+
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -68,14 +70,14 @@ private[kafka] class ZookeeperConsumerCo
   def createMessageStreams[T](
         topicCountMap: java.util.Map[String,java.lang.Integer],
         decoder: Decoder[T])
-      : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = {
+      : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
     import scala.collection.JavaConversions._
 
     val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
     val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
-    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]]
+    val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]]
+      var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)
@@ -85,9 +87,17 @@ private[kafka] class ZookeeperConsumerCo
 
   def createMessageStreams(
         topicCountMap: java.util.Map[String,java.lang.Integer])
-      : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
+      : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder)
 
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
+    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
+
+  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
+    createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
+
+  def createMessageStreamsByFilter(topicFilter: TopicFilter) =
+    createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
 
   def commitOffsets() {
     underlying.commitOffsets

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Thu May 31 01:51:23 2012
@@ -17,8 +17,10 @@
 
 package kafka.javaapi.message
 
+
 import kafka.message.{MessageAndOffset, InvalidMessageException}
 
+
 /**
  * A set of messages. A message set has a fixed serialized form, though the container
  * for the bytes could be either in-memory or on disk. A The format of each message is

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu May 31 01:51:23 2012
@@ -25,6 +25,7 @@ import kafka.utils._
 import kafka.common._
 import kafka.api.OffsetRequest
 import java.util._
+import kafka.server.BrokerTopicStat
 
 private[kafka] object Log {
   val FileSuffix = ".kafka"
@@ -199,7 +200,7 @@ private[kafka] class Log(val dir: File, 
    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
    * Returns the offset at which the messages are written.
    */
-  def append(messages: MessageSet): Unit = {
+  def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
@@ -208,13 +209,25 @@ private[kafka] class Log(val dir: File, 
       numberOfMessages += 1;
     }
 
+    BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
+    BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
-    
+
+    // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
+    val validByteBuffer = messages.getBuffer.duplicate()
+    val messageSetValidBytes = messages.validBytes
+    if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+        " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+
+    validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+    val validMessages = new ByteBufferMessageSet(validByteBuffer)
+
     // they are valid, insert them in the log
     lock synchronized {
       try {
         val segment = segments.view.last
-        segment.messageSet.append(messages)
+        segment.messageSet.append(validMessages)
         maybeFlush(numberOfMessages)
         maybeRoll(segment)
       }
@@ -247,10 +260,18 @@ private[kafka] class Log(val dir: File, 
       val deletable = view.takeWhile(predicate)
       for(seg <- deletable)
         seg.deleted = true
-      val numToDelete = deletable.size
+      var numToDelete = deletable.size
       // if we are deleting everything, create a new empty segment
-      if(numToDelete == view.size)
-        roll()
+      if(numToDelete == view.size) {
+        if (view(numToDelete - 1).size > 0)
+          roll()
+        else {
+          // If the last segment to be deleted is empty and we roll the log, the new segment will have the same
+          // file name. So simply reuse the last segment and reset the modified time.
+          view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
+          numToDelete -=1
+        }
+      }
       segments.trunc(numToDelete)
     }
   }
@@ -288,9 +309,12 @@ private[kafka] class Log(val dir: File, 
    */
   def roll() {
     lock synchronized {
-      val last = segments.view.last
       val newOffset = nextAppendOffset
       val newFile = new File(dir, Log.nameFromOffset(newOffset))
+      if (newFile.exists) {
+        warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
+        newFile.delete()
+      }
       debug("Rolling log '" + name + "' to " + newFile.getName())
       segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu May 31 01:51:23 2012
@@ -18,10 +18,10 @@
 package kafka.message
 
 import kafka.utils.Logging
-import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer
 import java.nio.channels._
 import kafka.utils.IteratorTemplate
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
 
 /**
  * A sequence of messages stored in a byte buffer
@@ -36,8 +36,9 @@ import kafka.utils.IteratorTemplate
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
                            private val initialOffset: Long = 0L,
                            private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
-  private var validByteCount = -1L
   private var shallowValidByteCount = -1L
+  if(sizeInBytes > Int.MaxValue)
+    throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
     this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -59,7 +60,7 @@ class ByteBufferMessageSet(private val b
 
   private def shallowValidBytes: Long = {
     if(shallowValidByteCount < 0) {
-      val iter = deepIterator
+      val iter = this.internalIterator(true)
       while(iter.hasNext) {
         val messageAndOffset = iter.next
         shallowValidByteCount = messageAndOffset.offset
@@ -70,12 +71,31 @@ class ByteBufferMessageSet(private val b
   }
   
   /** Write the messages in this set to the given channel */
-  def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
-    channel.write(buffer.duplicate)
-  
-  override def iterator: Iterator[MessageAndOffset] = deepIterator
+  def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = {
+    buffer.mark()
+    val written = channel.write(buffer)
+    buffer.reset()
+    written
+  }
+
+  /** default iterator that iterates over decompressed messages */
+  override def iterator: Iterator[MessageAndOffset] = internalIterator()
+
+  /** iterator over compressed messages without decompressing */
+  def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
+
+  def verifyMessageSize(maxMessageSize: Int){
+    var shallowIter = internalIterator(true)
+    while(shallowIter.hasNext){
+      var messageAndOffset = shallowIter.next
+      val payloadSize = messageAndOffset.message.payloadSize
+      if ( payloadSize > maxMessageSize)
+        throw new MessageSizeTooLargeException("payload size of " + payloadSize + " larger than " + maxMessageSize)
+    }
+  }
 
-  private def deepIterator(): Iterator[MessageAndOffset] = {
+  /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
+  private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     ErrorMapping.maybeThrowException(errorCode)
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
@@ -106,33 +126,50 @@ class ByteBufferMessageSet(private val b
         message.limit(size)
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
-        newMessage.compressionCodec match {
-          case NoCompressionCodec =>
-            debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
-            innerIter = null
-            currValidBytes += 4 + size
-            trace("currValidBytes = " + currValidBytes)
-            new MessageAndOffset(newMessage, currValidBytes)
-          case _ =>
-            debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
-            innerIter = CompressionUtils.decompress(newMessage).deepIterator
-            if (!innerIter.hasNext) {
-              currValidBytes += 4 + lastMessageSize
+        if(!newMessage.isValid)
+          throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
+            + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
+
+        if(isShallow){
+          currValidBytes += 4 + size
+          trace("shallow iterator currValidBytes = " + currValidBytes)
+          new MessageAndOffset(newMessage, currValidBytes)
+        }
+        else{
+          newMessage.compressionCodec match {
+            case NoCompressionCodec =>
+              debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
               innerIter = null
-            }
-            makeNext()
+              currValidBytes += 4 + size
+              trace("currValidBytes = " + currValidBytes)
+              new MessageAndOffset(newMessage, currValidBytes)
+            case _ =>
+              debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+              innerIter = CompressionUtils.decompress(newMessage).internalIterator()
+              if (!innerIter.hasNext) {
+                currValidBytes += 4 + lastMessageSize
+                innerIter = null
+              }
+              makeNext()
+          }
         }
       }
 
       override def makeNext(): MessageAndOffset = {
-        val isInnerDone = innerDone()
-        isInnerDone match {
-          case true => makeNextOuter
-          case false => {
-            val messageAndOffset = innerIter.next
-            if (!innerIter.hasNext)
-              currValidBytes += 4 + lastMessageSize
-            new MessageAndOffset(messageAndOffset.message, currValidBytes)
+        if(isShallow){
+          makeNextOuter
+        }
+        else{
+          val isInnerDone = innerDone()
+          debug("makeNext() in internalIterator: innerDone = " + isInnerDone)
+          isInnerDone match {
+            case true => makeNextOuter
+            case false => {
+              val messageAndOffset = innerIter.next
+              if (!innerIter.hasNext)
+                currValidBytes += 4 + lastMessageSize
+              new MessageAndOffset(messageAndOffset.message, currValidBytes)
+            }
           }
         }
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala Thu May 31 01:51:23 2012
@@ -49,7 +49,7 @@ class GZIPCompression(inputStream: Input
   }
 
   override def read(a: Array[Byte]): Int = {
-    gzipIn.read(a)	
+    gzipIn.read(a)
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala Thu May 31 01:51:23 2012
@@ -20,4 +20,6 @@ package kafka.message
 /**
  * Indicates that a message failed its checksum and is corrupt
  */
-class InvalidMessageException extends RuntimeException
+class InvalidMessageException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala Thu May 31 01:51:23 2012
@@ -13,11 +13,10 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.message
 
-/**
- * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
- */
-case class MessageAndOffset(val message: Message, val offset: Long)
+
+case class MessageAndOffset(message: Message, offset: Long)
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Thu May 31 01:51:23 2012
@@ -36,7 +36,7 @@ object ConsoleProducer { 
                            .withRequiredArg
                            .describedAs("connection_string")
                            .ofType(classOf[String])
-    val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
     val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
                              .withRequiredArg
@@ -78,7 +78,7 @@ object ConsoleProducer { 
 
     val topic = options.valueOf(topicOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
-    val async = options.has(asyncOpt)
+    val sync = options.has(syncOpt)
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
@@ -89,10 +89,10 @@ object ConsoleProducer { 
     val props = new Properties()
     props.put("zk.connect", zkConnect)
     props.put("compression.codec", DefaultCompressionCodec.codec.toString)
-    props.put("producer.type", if(async) "async" else "sync")
+    props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
-      props.put("batch.size", batchSize)
-    props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
+      props.put("batch.size", batchSize.toString)
+    props.put("queue.time", sendTimeout.toString)
     props.put("serializer.class", encoderClass)
 
     val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu May 31 01:51:23 2012
@@ -22,9 +22,7 @@ import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.Logging
-import kafka.serializer.Encoder
 import java.util.{Properties, Date}
-import kafka.message.Message
 import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
@@ -94,7 +92,3 @@ class KafkaLog4jAppender extends Appende
 
   override def requiresLayout: Boolean = false
 }
-
-class DefaultStringEncoder extends Encoder[LoggingEvent] {
-  override def toMessage(event: LoggingEvent):Message = new Message(event.getMessage.asInstanceOf[String].getBytes)
-}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Thu May 31 01:51:23 2012
@@ -32,10 +32,24 @@ class ProducerConfig(val props: Properti
   if(brokerList != null)
     throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead")
 
+  /**
+   * If DefaultEventHandler is used, this specifies the number of times to
+   * retry if an error is encountered during send. Currently, it is only
+   * appropriate when broker.list points to a VIP. If the zk.connect option
+   * is used instead, this will not have any effect because with the zk-based
+   * producer, brokers are not re-selected upon retry. So retries would go to
+   * the same (potentially still down) broker. (KAFKA-253 will help address
+   * this.)
+   */
+  val numRetries = Utils.getInt(props, "num.retries", 0)
+
   /** If both broker.list and zk.connect options are specified, throw an exception */
   if(zkConnect == null)
     throw new InvalidConfigException("zk.connect property is required")
 
+  if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
+    throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu May 31 01:51:23 2012
@@ -18,10 +18,16 @@
 package kafka.producer
 
 import kafka.api._
-import kafka.common.MessageSizeTooLargeException
 import kafka.message.MessageSet
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
 import kafka.utils._
+import java.util.Random
+import kafka.common.MessageSizeTooLargeException
+
+object SyncProducer {
+  val RequestKey: Short = 0
+  val randomGenerator = new Random
+}
 
 /*
  * Send a message set.
@@ -31,14 +37,22 @@ class SyncProducer(val config: SyncProdu
   
   private val MaxConnectBackoffMs = 60000
   private var sentOnConnection = 0
+  /** make time-based reconnect starting at a random time **/
+  private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval
+
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
 
-  debug("Instantiating Scala Sync Producer")
+  trace("Instantiating Scala Sync Producer")
 
   private def verifyRequest(request: Request) = {
-    if (logger.isTraceEnabled) {
+    /**
+     * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
+     * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
+     * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
+     */
+    if (logger.isDebugEnabled) {
       val buffer = new BoundedByteBufferSend(request).buffer
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
@@ -71,9 +85,11 @@ class SyncProducer(val config: SyncProdu
       }
       // TODO: do we still need this?
       sentOnConnection += 1
-      if(sentOnConnection >= config.reconnectInterval) {
+
+      if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) {
         reconnect()
         sentOnConnection = 0
+        lastConnectionTime = System.currentTimeMillis
       }
       SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
       response

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Thu May 31 01:51:23 2012
@@ -40,6 +40,9 @@ trait SyncProducerConfigShared {
 
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
 
+  /** negative reconnect time interval means disabling this time-based reconnect feature */
+  var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
+
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
 
   /* the client application sending the producer requests */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Thu May 31 01:51:23 2012
@@ -70,10 +70,11 @@ class ProducerSendThread[K,V](val thread
             trace("Dequeued item for topic %s, partition key: %s, data: %s"
               .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
           events += currentQueueItem
-
-          // check if the batch size is reached
-          full = events.size >= batchSize
         }
+
+        // check if the batch size is reached
+        full = events.size >= batchSize
+
         if(full || expired) {
           if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
           if(full) debug("Batch full. Sending..")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu May 31 01:51:23 2012
@@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: Requ
           // TODO: need to handle ack's here!  Will probably move to another method.
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
-          log.append(partitionData.messages)
+          log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
           offsets(msgIndex) = log.nextAppendOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu May 31 01:51:23 2012
@@ -71,7 +71,7 @@ class KafkaConfig(props: Properties) ext
   val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
   
   /* the maximum size of the log before deleting it */
-  val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
+  val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Thu May 31 01:51:23 2012
@@ -17,9 +17,9 @@
 
 package kafka.server
 
-import org.apache.log4j._
 import kafka.network._
 import kafka.utils._
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * A thread that answers kafka requests.
@@ -60,3 +60,60 @@ class KafkaRequestHandlerPool(val reques
   }
   
 }
+
+trait BrokerTopicStatMBean {
+  def getMessagesIn: Long
+  def getBytesIn: Long
+  def getBytesOut: Long
+  def getFailedProduceRequest: Long
+  def getFailedFetchRequest: Long
+}
+
+@threadsafe
+class BrokerTopicStat extends BrokerTopicStatMBean {
+  private val numCumulatedMessagesIn = new AtomicLong(0)
+  private val numCumulatedBytesIn = new AtomicLong(0)
+  private val numCumulatedBytesOut = new AtomicLong(0)
+  private val numCumulatedFailedProduceRequests = new AtomicLong(0)
+  private val numCumulatedFailedFetchRequests = new AtomicLong(0)
+
+  def getMessagesIn: Long = numCumulatedMessagesIn.get
+
+  def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
+
+  def getBytesIn: Long = numCumulatedBytesIn.get
+
+  def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
+
+  def getBytesOut: Long = numCumulatedBytesOut.get
+
+  def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
+
+  def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
+
+  def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
+
+  def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
+
+  def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
+}
+
+object BrokerTopicStat extends Logging {
+  private val stats = new Pool[String, BrokerTopicStat]
+  private val allTopicStat = new BrokerTopicStat
+  Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
+
+  def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
+
+  def getBrokerTopicStat(topic: String): BrokerTopicStat = {
+    var stat = stats.get(topic)
+    if (stat == null) {
+      stat = new BrokerTopicStat
+      if (stats.putIfNotExists(topic, stat) == null)
+        Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
+      else
+        stat = stats.get(topic)
+    }
+    return stat
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala Thu May 31 01:51:23 2012
@@ -17,36 +17,21 @@
 
 package kafka.server
 
-import kafka.utils.{Utils, Logging}
-import kafka.consumer._
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
-import kafka.message.Message
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.Map
-
-class KafkaServerStartable(val serverConfig: KafkaConfig,
-                           val consumerConfig: ConsumerConfig,
-                           val producerConfig: ProducerConfig) extends Logging {
+import kafka.utils.Logging
+
+
+class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   private var server : KafkaServer = null
-  private var embeddedConsumer : EmbeddedConsumer = null
 
   init
 
-  def this(serverConfig: KafkaConfig) = this(serverConfig, null, null)
-
   private def init() {
     server = new KafkaServer(serverConfig)
-    if (consumerConfig != null)
-      embeddedConsumer =
-        new EmbeddedConsumer(consumerConfig, producerConfig, this)
   }
 
   def startup() {
     try {
       server.startup()
-      if (embeddedConsumer != null)
-        embeddedConsumer.startup()
     }
     catch {
       case e =>
@@ -57,8 +42,6 @@ class KafkaServerStartable(val serverCon
 
   def shutdown() {
     try {
-      if (embeddedConsumer != null)
-        embeddedConsumer.shutdown()
       server.shutdown()
     }
     catch {
@@ -73,153 +56,4 @@ class KafkaServerStartable(val serverCon
 
 }
 
-class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
-                       private val producerConfig: ProducerConfig,
-                       private val kafkaServerStartable: KafkaServerStartable) extends TopicEventHandler[String] with Logging {
-
-  private val whiteListTopics =
-    consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
-
-  private val blackListTopics =
-    consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
-
-  // mirrorTopics should be accessed by handleTopicEvent only
-  private var mirrorTopics:Seq[String] = List()
-
-  private var consumerConnector: ConsumerConnector = null
-  private var topicEventWatcher:ZookeeperTopicEventWatcher = null
-
-  private val producer = new Producer[Null, Message](producerConfig)
-
-  var threadList = List[MirroringThread]()
-
-  private def isTopicAllowed(topic: String) = {
-    if (consumerConfig.mirrorTopicsWhitelist.nonEmpty)
-      whiteListTopics.contains(topic)
-    else
-      !blackListTopics.contains(topic)
-  }
-
-  // TopicEventHandler call-back only
-  @Override
-  def handleTopicEvent(allTopics: Seq[String]) {
-    val newMirrorTopics = allTopics.filter(isTopicAllowed)
-
-    val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
-    if (addedTopics.nonEmpty)
-      info("topic event: added topics = %s".format(addedTopics))
-
-    val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
-    if (deletedTopics.nonEmpty)
-      info("topic event: deleted topics = %s".format(deletedTopics))
-
-    mirrorTopics = newMirrorTopics
-
-    if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
-      info("mirror topics = %s".format(mirrorTopics))
-      startNewConsumerThreads(makeTopicMap(mirrorTopics))
-    }
-  }
-
-  private def makeTopicMap(mirrorTopics: Seq[String]) = {
-    if (mirrorTopics.nonEmpty)
-      Utils.getConsumerTopicMap(mirrorTopics.mkString(
-        "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads),
-        ":%d".format(consumerConfig.mirrorConsumerNumThreads)))
-    else
-      Utils.getConsumerTopicMap("")
-  }
-
-  private def startNewConsumerThreads(topicMap: Map[String, Int]) {
-    if (topicMap.nonEmpty) {
-      if (consumerConnector != null)
-        consumerConnector.shutdown()
-
-      /**
-       * Before starting new consumer threads for the updated set of topics,
-       * shutdown the existing mirroring threads. Since the consumer connector
-       * is already shutdown, the mirroring threads should finish their task almost
-       * instantaneously. If they don't, this points to an error that needs to be looked
-       * into, and further mirroring should stop
-       */
-      threadList.foreach(_.shutdown)
-
-      // KAFKA: 212: clear the thread list to remove the older thread references that are already shutdown
-      threadList = Nil
-
-      consumerConnector = Consumer.create(consumerConfig)
-      val topicMessageStreams =  consumerConnector.createMessageStreams(topicMap)
-      for ((topic, streamList) <- topicMessageStreams)
-        for (i <- 0 until streamList.length)
-          threadList ::= new MirroringThread(streamList(i), topic, i)
-
-      threadList.foreach(_.start)
-    }
-    else
-      info("Not starting mirroring threads (mirror topic list is empty)")
-  }
-
-  def startup() {
-    info("staring up embedded consumer")
-    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable)
-    /*
-    * consumer threads are (re-)started upon topic events (which includes an
-    * initial startup event which lists the current topics)
-    */
-  }
-
-  def shutdown() {
-    // first shutdown the topic watcher to prevent creating new consumer streams
-    if (topicEventWatcher != null)
-      topicEventWatcher.shutdown()
-    info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers")
-    // stop pulling more data for mirroring
-    if (consumerConnector != null)
-      consumerConnector.shutdown()
-    info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads")
-    // wait for all mirroring threads to stop
-    threadList.foreach(_.shutdown)
-    info("Stopped all existing mirroring threads, now stopping the producer")
-    // only then, shutdown the producer
-    producer.close()
-    info("Successfully shutdown this Kafka mirror")
-  }
-
-  class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread with Logging {
-    val shutdownComplete = new CountDownLatch(1)
-    val name = "kafka-embedded-consumer-%s-%d".format(topic, threadId)
-    this.setDaemon(false)
-    this.setName(name)
-
-
-    override def run = {
-      info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
-
-      try {
-        for (message <- stream) {
-          trace("Mirroring thread received message " + message.checksum)
-          val pd = new ProducerData[Null, Message](topic, message)
-          producer.send(pd)
-        }
-      }
-      catch {
-        case e =>
-          fatal(topic + " stream " + threadId + " unexpectedly exited", e)
-      }finally {
-        shutdownComplete.countDown
-        info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
-      }
-    }
-
-    def shutdown = {
-      try {
-        shutdownComplete.await
-      }catch {
-        case e: InterruptedException => fatal("Shutdown of thread " + name + " interrupted. " +
-          "Mirroring thread might leak data!")
-      }
-    }
-  }
-}
-
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala Thu May 31 01:51:23 2012
@@ -82,15 +82,15 @@ object ConsumerShell {
   }
 }
 
-class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread with Logging {
+class ZKConsumerThread(stream: KafkaStream[String]) extends Thread with Logging {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
     println("Starting consumer thread..")
     var count: Int = 0
     try {
-      for (message <- stream) {
-        println("consumed: " + message)
+      for (messageAndMetadata <- stream) {
+        println("consumed: " + messageAndMetadata.message)
         count += 1
       }
     }catch {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Thu May 31 01:51:23 2012
@@ -32,8 +32,6 @@ object ReplayLogProducer extends Logging
   private val GROUPID: String = "replay-log-producer"
 
   def main(args: Array[String]) {
-    var isNoPrint = false;
-
     val config = new Config(args)
 
     val executor = Executors.newFixedThreadPool(config.numThreads)
@@ -151,7 +149,7 @@ object ReplayLogProducer extends Logging
     }
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread with Logging {
+  class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
@@ -180,9 +178,9 @@ object ReplayLogProducer extends Logging
             stream.slice(0, config.numMessages)
           else
             stream
-        for (message <- iter) {
+        for (messageAndMetadata <- iter) {
           try {
-            producer.send(new ProducerData[Message, Message](config.outputTopic, message))
+            producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message))
             if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
               Thread.sleep(config.delayedMSBtwSend)
             messageCount += 1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Thu May 31 01:51:23 2012
@@ -57,9 +57,9 @@ object VerifyConsumerRebalance extends L
       // check if the rebalancing operation succeeded.
       try {
         if(validateRebalancingOperation(zkClient, group))
-          info("Rebalance operation successful !")
+          println("Rebalance operation successful !")
         else
-          error("Rebalance operation failed !")
+          println("Rebalance operation failed !")
       } catch {
         case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
       }
@@ -132,6 +132,4 @@ object VerifyConsumerRebalance extends L
 
     rebalanceSucceeded
   }
-
-
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala Thu May 31 01:51:23 2012
@@ -23,17 +23,21 @@ trait Logging {
   val loggerName = this.getClass.getName
   lazy val logger = Logger.getLogger(loggerName)
 
+  protected var logIdent = ""
+  
+  private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
+
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())
-      logger.trace(msg)	
+      logger.trace(msgWithLogIdent(msg))
   }
   def trace(e: => Throwable): Any = {
     if (logger.isTraceEnabled())
-      logger.trace("",e)	
+      logger.trace(logIdent,e)
   }
   def trace(msg: => String, e: => Throwable) = {
     if (logger.isTraceEnabled())
-      logger.trace(msg,e)
+      logger.trace(msgWithLogIdent(msg),e)
   }
   def swallowTrace(action: => Unit) {
     Utils.swallow(logger.trace, action)
@@ -41,15 +45,15 @@ trait Logging {
 
   def debug(msg: => String): Unit = {
     if (logger.isDebugEnabled())
-      logger.debug(msg)
+      logger.debug(msgWithLogIdent(msg))
   }
   def debug(e: => Throwable): Any = {
     if (logger.isDebugEnabled())
-      logger.debug("",e)	
+      logger.debug(logIdent,e)
   }
   def debug(msg: => String, e: => Throwable) = {
     if (logger.isDebugEnabled())
-      logger.debug(msg,e)
+      logger.debug(msgWithLogIdent(msg),e)
   }
   def swallowDebug(action: => Unit) {
     Utils.swallow(logger.debug, action)
@@ -57,55 +61,54 @@ trait Logging {
 
   def info(msg: => String): Unit = {
     if (logger.isInfoEnabled())
-      logger.info(msg)
+      logger.info(msgWithLogIdent(msg))
   }
   def info(e: => Throwable): Any = {
     if (logger.isInfoEnabled())
-      logger.info("",e)
+      logger.info(logIdent,e)
   }
   def info(msg: => String,e: => Throwable) = {
     if (logger.isInfoEnabled())
-      logger.info(msg,e)
+      logger.info(msgWithLogIdent(msg),e)
   }
   def swallowInfo(action: => Unit) {
     Utils.swallow(logger.info, action)
   }
 
   def warn(msg: => String): Unit = {
-    logger.warn(msg)
+    logger.warn(msgWithLogIdent(msg))
   }
   def warn(e: => Throwable): Any = {
-    logger.warn("",e)
+    logger.warn(logIdent,e)
   }
   def warn(msg: => String, e: => Throwable) = {
-    logger.warn(msg,e)
+    logger.warn(msgWithLogIdent(msg),e)
   }
   def swallowWarn(action: => Unit) {
     Utils.swallow(logger.warn, action)
   }
   def swallow(action: => Unit) = swallowWarn(action)
 
-  def error(msg: => String):Unit = {
-    logger.error(msg)
+  def error(msg: => String): Unit = {
+    logger.error(msgWithLogIdent(msg))
   }		
   def error(e: => Throwable): Any = {
-    logger.error("",e)
+    logger.error(logIdent,e)
   }
   def error(msg: => String, e: => Throwable) = {
-    logger.error(msg,e)
+    logger.error(msgWithLogIdent(msg),e)
   }
   def swallowError(action: => Unit) {
     Utils.swallow(logger.error, action)
   }
 
   def fatal(msg: => String): Unit = {
-    logger.fatal(msg)
+    logger.fatal(msgWithLogIdent(msg))
   }
   def fatal(e: => Throwable): Any = {
-    logger.fatal("",e)
+    logger.fatal(logIdent,e)
   }	
   def fatal(msg: => String, e: => Throwable) = {
-    logger.fatal(msg,e)
+    logger.fatal(msgWithLogIdent(msg),e)
   }
- 
 }
\ No newline at end of file