You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/02 20:07:50 UTC

svn commit: r1239766 - in /incubator/kafka/trunk: core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaa...

Author: nehanarkhede
Date: Thu Feb  2 19:07:48 2012
New Revision: 1239766

URL: http://svn.apache.org/viewvc?rev=1239766&view=rev
Log:
KAFKA 256 Bug in the consumer rebalancing logic leads to the consumer not pulling data from some partitions; patched by nehanarkhede; reviewed by joelkoshy

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/trunk/core/src/test/resources/log4j.properties
    incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
    incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala Thu Feb  2 19:07:48 2012
@@ -19,8 +19,6 @@ package kafka.api
 
 import java.nio._
 import kafka.network._
-import kafka.utils._
-import kafka.api._
 
 object MultiFetchRequest {
   def readFrom(buffer: ByteBuffer): MultiFetchRequest = {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala Thu Feb  2 19:07:48 2012
@@ -17,11 +17,7 @@
 
 package kafka.cluster
 
-import java.util.Arrays
 import kafka.utils._
-import java.net.InetAddress
-import kafka.server.KafkaConfig
-import util.parsing.json.JSON
 
 /**
  * A Kafka broker

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala Thu Feb  2 19:07:48 2012
@@ -17,7 +17,6 @@
 
 package kafka.cluster
 
-import kafka.utils._
 import scala.collection._
 
 /**
@@ -33,7 +32,7 @@ private[kafka] class Cluster {
       brokers.put(broker.id, broker)
   }
 
-  def getBroker(id: Int) = brokers.get(id).get
+  def getBroker(id: Int): Option[Broker] = brokers.get(id)
   
   def add(broker: Broker) = brokers.put(broker.id, broker)
   

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala Thu Feb  2 19:07:48 2012
@@ -17,7 +17,6 @@
 
 package kafka.common
 
-import kafka.consumer._
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
 import java.lang.Throwable

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Feb  2 19:07:48 2012
@@ -21,13 +21,11 @@ import scala.collection.mutable._
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
-import java.util.Arrays.asList
 import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
 import kafka.utils.{Utils, Logging}
-import kafka.utils.ZkUtils
 import kafka.utils.ZKStringSerializer
 
 /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Thu Feb  2 19:07:48 2012
@@ -22,6 +22,7 @@ import kafka.cluster._
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.BlockingQueue
 import kafka.utils._
+import java.lang.IllegalStateException
 
 /**
  * The fetcher is a background thread that fetches data from a set of servers
@@ -73,7 +74,13 @@ private [consumer] class Fetcher(val con
 
     // open a new fetcher thread for each broker
     val ids = Set() ++ topicInfos.map(_.brokerId)
-    val brokers = ids.map(cluster.getBroker(_))
+    val brokers = ids.map { id =>
+      cluster.getBroker(id) match {
+        case Some(broker) => broker
+        case None => throw new IllegalStateException("Broker " + id + " is unavailable, fetchers could not be started")
+      }
+    }
+
     fetcherThreads = new Array[FetcherRunnable](brokers.size)
     var i = 0
     for(broker <- brokers) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Thu Feb  2 19:07:48 2012
@@ -21,7 +21,7 @@ import scala.collection._
 import scala.util.parsing.json.JSON
 import kafka.utils.Logging
 
-private[consumer] object TopicCount extends Logging {
+private[kafka] object TopicCount extends Logging {
   val myConversionFunc = {input : String => input.toInt}
   JSON.globalNumberParser = myConversionFunc
 
@@ -44,7 +44,7 @@ private[consumer] object TopicCount exte
 
 }
 
-private[consumer] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
+private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
 
   def getConsumerThreadIdsPerTopic()
     : Map[String, Set[String]] = {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Feb  2 19:07:48 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,6 +30,8 @@ import kafka.api.OffsetRequest
 import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
+import java.lang.IllegalStateException
+import kafka.utils.ZkUtils._
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -157,7 +159,7 @@ private[kafka] class ZookeeperConsumerCo
 
     var consumerUuid : String = null
     config.consumerId match {
-      case Some(consumerId) // for testing only 
+      case Some(consumerId) // for testing only
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
       => val uuid = UUID.randomUUID()
@@ -193,7 +195,7 @@ private[kafka] class ZookeeperConsumerCo
 
     ret.foreach { topicAndStreams =>
       // register on broker partition path changes
-      val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topicAndStreams._1
+      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
       zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
     }
 
@@ -204,7 +206,7 @@ private[kafka] class ZookeeperConsumerCo
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
@@ -239,7 +241,7 @@ private[kafka] class ZookeeperConsumerCo
       for (info <- infos.values) {
         val newOffset = info.getConsumeOffset
         try {
-          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
+          updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
             newOffset.toString)
         }
         catch {
@@ -289,7 +291,7 @@ private[kafka] class ZookeeperConsumerCo
     try {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       val znode = topicDirs.consumerOffsetDir + "/" + partition.name
-      val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+      val offsetString = readDataMaybeNull(zkClient, znode)
       if (offsetString != null)
         return offsetString.toLong
       else
@@ -309,8 +311,12 @@ private[kafka] class ZookeeperConsumerCo
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
-      val cluster = ZkUtils.getCluster(zkClient)
-      val broker = cluster.getBroker(brokerId)
+      val cluster = getCluster(zkClient)
+      val broker = cluster.getBroker(brokerId) match {
+        case Some(b) => b
+        case None => throw new IllegalStateException("Broker " + brokerId + " is unavailable. Cannot issue " +
+          "getOffsetsBefore request")
+      }
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
                                             ConsumerConfig.SocketBufferSize)
       val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
@@ -358,7 +364,7 @@ private[kafka] class ZookeeperConsumerCo
       loadBalancerListener.syncedRebalance
 
       // There is no need to resubscribe to child and state changes.
-      // The child change watchers will be set inside rebalance when we read the children list. 
+      // The child change watchers will be set inside rebalance when we read the children list.
     }
 
   }
@@ -376,34 +382,17 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def releasePartitionOwnership()= {
+      info("Releasing partition ownership")
       for ((topic, infos) <- topicRegistry) {
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         for(partition <- infos.keys) {
           val znode = topicDirs.consumerOwnerDir + "/" + partition
-          ZkUtils.deletePath(zkClient, znode)
+          deletePath(zkClient, znode)
           debug("Consumer " + consumerIdString + " releasing " + znode)
         }
       }
     }
 
-    private def getConsumersPerTopic(group: String) : mutable.Map[String, List[String]] = {
-      val consumers = ZkUtils.getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
-      val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
-      for (consumer <- consumers) {
-        val topicCount = getTopicCount(consumer)
-        for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
-          for (consumerThreadId <- consumerThreadIdSet)
-            consumersPerTopicMap.get(topic) match {
-              case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
-              case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
-            }
-        }
-      }
-      for ( (topic, consumerList) <- consumersPerTopicMap )
-        consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
-      consumersPerTopicMap
-    }
-
     private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
                                     newPartMap: Map[String,List[String]],
                                     oldPartMap: Map[String,List[String]],
@@ -416,11 +405,6 @@ private[kafka] class ZookeeperConsumerCo
       relevantTopicThreadIdsMap
     }
 
-    private def getTopicCount(consumerId: String) : TopicCount = {
-      val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
-      TopicCount.constructTopicCount(consumerId, topicCountJson)
-    }
-
     def resetState() {
       topicRegistry.clear
       oldConsumersPerTopicMap.clear
@@ -432,19 +416,34 @@ private[kafka] class ZookeeperConsumerCo
         for (i <- 0 until config.maxRebalanceRetries) {
           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
+          val cluster = getCluster(zkClient)
           try {
-            done = rebalance()
+            done = rebalance(cluster)
           }
           catch {
             case e =>
-              // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
-              // For example, a ZK node can disappear between the time we get all children and the time we try to get
-              // the value of a child. Just let this go since another rebalance will be triggered.
+              /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+               * For example, a ZK node can disappear between the time we get all children and the time we try to get
+               * the value of a child. Just let this go since another rebalance will be triggered.
+               **/
               info("exception during rebalance ", e)
+              /* Explicitly make sure another rebalancing attempt will get triggered. */
+              done = false
           }
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
-          if (done)
+          if (done) {
             return
+          }else {
+              /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
+               * clear the cache */
+              info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
+              oldConsumersPerTopicMap.clear()
+              oldPartitionsPerTopicMap.clear()
+          }
+          // commit offsets
+          commitOffsets()
+          // stop all fetchers and clear all the queues to avoid data duplication
+          closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
           // release all partitions, reset state and retry
           releasePartitionOwnership()
           Thread.sleep(config.rebalanceBackoffMs)
@@ -454,26 +453,30 @@ private[kafka] class ZookeeperConsumerCo
       throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
     }
 
-    private def rebalance(): Boolean = {
-      val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic
-      val cluster = ZkUtils.getCluster(zkClient)
-      val consumersPerTopicMap = getConsumersPerTopic(group)
-      val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+    private def rebalance(cluster: Cluster): Boolean = {
+      val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+      val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
+      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
       val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
       if (relevantTopicThreadIdsMap.size <= 0) {
-        info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
+        info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
+          format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
+        debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
+        debug("Consumers per topic cache " + oldConsumersPerTopicMap)
         return true
       }
 
-      // fetchers must be stopped to avoid data duplication, since if the current
-      // rebalancing attempt fails, the partitions that are released could be owned by another consumer.
-      // But if we don't stop the fetchers first, this consumer would continue returning data for released
-      // partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+      /**
+       * fetchers must be stopped to avoid data duplication, since if the current
+       * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
+       * But if we don't stop the fetchers first, this consumer would continue returning data for released
+       * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+       */
       closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
 
-      info("Releasing partition ownership")
       releasePartitionOwnership()
 
+      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
       for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
         topicRegistry.remove(topic)
         topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
@@ -505,41 +508,60 @@ private[kafka] class ZookeeperConsumerCo
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
               val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
-              if (ownPartition)
-                info(consumerThreadId + " successfully owned partition " + partition)
-              else
+              if (!ownPartition)
                 return false
+              else // record the partition ownership decision
+                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
           }
         }
       }
-      updateFetcher(cluster, kafkaMessageStreams)
-      oldPartitionsPerTopicMap = partitionsPerTopicMap
-      oldConsumersPerTopicMap = consumersPerTopicMap
-      true
+
+      /**
+       * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+       * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
+       */
+      if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+        info("Updating the cache")
+        debug("Partitions per topic cache " + partitionsPerTopicMap)
+        debug("Consumers per topic cache " + consumersPerTopicMap)
+        oldPartitionsPerTopicMap = partitionsPerTopicMap
+        oldConsumersPerTopicMap = consumersPerTopicMap
+        updateFetcher(cluster, kafkaMessageStreams)
+        true
+      }else
+        false
     }
 
-    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
-                              relevantTopicThreadIdsMap: Map[String, Set[String]]) {
-      // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
-      // after this rebalancing attempt
-      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+    private def closeFetchersForQueues(cluster: Cluster,
+                                       kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                                       queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) => f.stopConnectionsToAllBrokers
-        f.clearFetcherQueues(allPartitionInfos, cluster, queuesTobeCleared, kafkaMessageStreams)
+        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
         info("Committing all offsets after clearing the fetcher queues")
-        // here, we need to commit offsets before stopping the consumer from returning any more messages
-        // from the current data chunk. Since partition ownership is not yet released, this commit offsets
-        // call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
-        // for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
-        // by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
-        // successfully and the fetchers restart to fetch more data chunks
+        /**
+        * here, we need to commit offsets before stopping the consumer from returning any more messages
+        * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+        * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+        * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+        * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+        * successfully and the fetchers restart to fetch more data chunks
+        **/
         commitOffsets
         case None =>
       }
     }
 
+    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                              relevantTopicThreadIdsMap: Map[String, Set[String]]) {
+      // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
+      // after this rebalancing attempt
+      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+      closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+    }
+
     private def updateFetcher[T](cluster: Cluster,
                                  kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
       // update partitions for fetcher
@@ -560,18 +582,47 @@ private[kafka] class ZookeeperConsumerCo
     private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
                                  topic: String, consumerThreadId: String) : Boolean = {
       val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-      try {
-        ZkUtils.createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+      // check if some other consumer owns this partition at this time
+      val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
+      if(currentPartitionOwner != null) {
+        if(currentPartitionOwner.equals(consumerThreadId)) {
+          info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
+          addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+          true
+        }
+        else {
+          info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
+          false
+        }
+      } else {
+        addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+        true
       }
-      catch {
-        case e: ZkNodeExistsException =>
-        // The node hasn't been deleted by the original owner. So wait a bit and retry.
-          info("waiting for the partition ownership to be deleted: " + partition)
-          return false
-        case e2 => throw e2
+    }
+
+    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+      val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
+        val topic = partitionOwner._1._1
+        val partition = partitionOwner._1._2
+        val consumerThreadId = partitionOwner._2
+        val topicDirs = new ZKGroupTopicDirs(group, topic)
+        val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+        try {
+          createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+          info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+          true
+        }
+        catch {
+          case e: ZkNodeExistsException =>
+            // The node hasn't been deleted by the original owner. So wait a bit and retry.
+            info("waiting for the partition ownership to be deleted: " + partition)
+            false
+          case e2 => throw e2
+        }
       }
-      addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
-      true
+      val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
+      if(success > 0) false       /* even if one of the partition ownership attempt has failed, return false */
+      else true
     }
 
     private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
@@ -580,7 +631,7 @@ private[kafka] class ZookeeperConsumerCo
       val partTopicInfoMap = topicRegistry.get(topic)
 
       val znode = topicDirs.consumerOffsetDir + "/" + partition.name
-      val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+      val offsetString = readDataMaybeNull(zkClient, znode)
       // If first time starting a consumer, set the initial offset based on the config
       var offset : Long = 0L
       if (offsetString == null)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Thu Feb  2 19:07:48 2012
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 import scala.collection.JavaConversions._
-import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.server.KafkaServerStartable

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala Thu Feb  2 19:07:48 2012
@@ -16,10 +16,8 @@
 */
 package kafka.javaapi
 
-import java.nio.ByteBuffer
 import kafka.serializer.Encoder
-import kafka.producer.{ProducerConfig, ProducerPool}
-import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+import kafka.producer.async.QueueItem
 import kafka.utils.Logging
 
 private[javaapi] object Implicits extends Logging {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Thu Feb  2 19:07:48 2012
@@ -17,8 +17,7 @@
 
 package kafka.javaapi.message
 
-import java.nio.channels._
-import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+import kafka.message.{MessageAndOffset, InvalidMessageException}
 
 /**
  * A set of messages. A message set has a fixed serialized form, though the container

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala Thu Feb  2 19:07:48 2012
@@ -54,7 +54,6 @@ class Producer[K,V](config: ProducerConf
    * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
    * object in the  send API
    */
-  import kafka.javaapi.Implicits._
   def this(config: ProducerConfig,
            encoder: Encoder[V],
            eventHandler: kafka.javaapi.producer.async.EventHandler[V],

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala Thu Feb  2 19:07:48 2012
@@ -19,7 +19,6 @@ package kafka.message
 
 import java.io.InputStream
 import java.nio.ByteBuffer
-import scala.Math
 
 class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
   override def read():Int  = {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu Feb  2 19:07:48 2012
@@ -17,7 +17,6 @@
 
 package kafka.message
 
-import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Thu Feb  2 19:07:48 2012
@@ -54,8 +54,8 @@ class GZIPCompression(inputStream: Input
 }
 
 class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream)  extends CompressionFacade(inputStream,outputStream) {
-  import org.xerial.snappy.{SnappyInputStream}
-  import org.xerial.snappy.{SnappyOutputStream}
+  import org.xerial.snappy.SnappyInputStream
+  import org.xerial.snappy.SnappyOutputStream
   
   val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream)
   val snappyOut:SnappyOutputStream = if (outputStream == null) null else new  SnappyOutputStream(outputStream)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala Thu Feb  2 19:07:48 2012
@@ -22,8 +22,6 @@ import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
 
-import kafka._
-import kafka.message._
 import kafka.utils._
 
 /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala Thu Feb  2 19:07:48 2012
@@ -18,9 +18,6 @@
 package kafka.message
 
 import java.nio._
-import java.nio.channels._
-import java.util.zip.CRC32
-import java.util.UUID
 import kafka.utils._
 import kafka.common.UnknownMagicByteException
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Thu Feb  2 19:07:48 2012
@@ -21,7 +21,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.net._
 import java.io._
-import java.nio._
 import java.nio.channels._
 
 import kafka.utils._

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala Thu Feb  2 19:07:48 2012
@@ -17,8 +17,6 @@
 
 package kafka.network
 
-import java.util.concurrent.atomic._
-import javax.management._
 import kafka.utils._
 import kafka.api.RequestKeys
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Thu Feb  2 19:07:48 2012
@@ -18,14 +18,10 @@
 package kafka.producer
 
 import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
 import joptsimple._
-import java.util.Arrays.asList
 import java.util.Properties
-import java.util.Random
 import java.io._
 import kafka.message._
-import kafka.utils._
 import kafka.serializer._
 
 object ConsoleProducer { 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Feb  2 19:07:48 2012
@@ -19,7 +19,7 @@ package kafka.producer
 
 import async.MissingConfigException
 import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
-import org.apache.log4j.{Logger, AppenderSkeleton}
+import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.{Utils, Logging}
 import kafka.serializer.Encoder

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Thu Feb  2 19:07:48 2012
@@ -19,7 +19,6 @@ package kafka.producer
 
 import kafka.utils.Utils
 import java.util.Properties
-import kafka.message.{CompressionUtils, CompressionCodec}
 
 class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
   /** the broker to which the producer sends events */

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Thu Feb  2 19:07:48 2012
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import scala.reflect.BeanProperty
 import kafka.log.LogManager
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala Thu Feb  2 19:07:48 2012
@@ -17,10 +17,7 @@
 
 package kafka.server
 
-import java.nio._
-import java.nio.channels._
 import kafka.network._
-import kafka.message._
 import kafka.utils._
 
 /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala Thu Feb  2 19:07:48 2012
@@ -22,7 +22,7 @@ import java.util.Date
 import java.text.SimpleDateFormat
 import javax.management._
 import javax.management.remote._
-import joptsimple.{OptionSet, OptionParser}
+import joptsimple.OptionParser
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.math._

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala Thu Feb  2 19:07:48 2012
@@ -17,12 +17,9 @@
 
 package kafka.tools
 
-import java.net.URI
 import java.io._
 import joptsimple._
-import kafka.message._
 import kafka.producer._
-import java.util.Properties
 import kafka.utils.Utils
 
 /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Thu Feb  2 19:07:48 2012
@@ -17,19 +17,17 @@
 
 package kafka.tools
 
-import java.io.File
 import joptsimple.OptionParser
-import org.apache.log4j.Logger
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
 import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.serializer.DefaultEncoder
 import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Utils, Logging}
+import kafka.utils.{ZKStringSerializer, Logging}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient._
-import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+import kafka.message.{CompressionCodec, Message}
 
 object ReplayLogProducer extends Logging {
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Thu Feb  2 19:07:48 2012
@@ -22,7 +22,6 @@ import joptsimple._
 import kafka.api.FetchRequest
 import kafka.utils._
 import kafka.consumer._
-import kafka.server._
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer

Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1239766&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Thu Feb  2 19:07:48 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer}
+
+object VerifyConsumerRebalance extends Logging {
+  def main(args: Array[String]) {
+    val parser = new OptionParser()
+
+    val zkConnectOpt = parser.accepts("zk.connect", "ZooKeeper connect string.").
+      withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
+    val groupOpt = parser.accepts("group", "Consumer group.").
+      withRequiredArg().ofType(classOf[String])
+    parser.accepts("help", "Print this message.")
+
+    val options = parser.parse(args : _*)
+
+    if (options.has("help")) {
+      parser.printHelpOn(System.out)
+      System.exit(0)
+    }
+
+    for (opt <- List(groupOpt))
+      if (!options.has(opt)) {
+        System.err.println("Missing required argument: %s".format(opt))
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val group = options.valueOf(groupOpt)
+
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+      debug("zkConnect = %s; group = %s".format(zkConnect, group))
+
+      // check if the rebalancing operation succeeded.
+      try {
+        if(validateRebalancingOperation(zkClient, group))
+          info("Rebalance operation successful !")
+        else
+          error("Rebalance operation failed !")
+      } catch {
+        case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
+      }
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  private def validateRebalancingOperation(zkClient: ZkClient, group: String): Boolean = {
+    info("Verifying rebalancing operation for consumer group " + group)
+    var rebalanceSucceeded: Boolean = true
+    /**
+     * A successful rebalancing operation would select an owner for each available partition
+     * This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists
+     * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
+     */
+    val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group)
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keys.iterator)
+
+    partitionsPerTopicMap.foreach { partitionsForTopic =>
+      val topic = partitionsForTopic._1
+      val partitions = partitionsForTopic._2
+      val topicDirs = new ZKGroupTopicDirs(group, topic)
+      info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
+      info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
+      val partitionsWithOwners = ZkUtils.getChildrenParentMayNotExist(zkClient, topicDirs.consumerOwnerDir)
+      if(partitionsWithOwners.size == 0) {
+        error("No owners for any partitions for topic " + topic)
+        rebalanceSucceeded = false
+      }
+      debug("Children of " + topicDirs.consumerOwnerDir + " = " + partitionsWithOwners.toString)
+      val consumerIdsForTopic = consumersPerTopicMap.get(topic)
+
+      // for each available partition for topic, check if an owner exists
+      partitions.foreach { partition =>
+      // check if there is a node for [partition]
+        if(!partitionsWithOwners.exists(p => p.equals(partition))) {
+          error("No owner for topic %s partition %s".format(topic, partition))
+          rebalanceSucceeded = false
+        }
+        // try reading the partition owner path for see if a valid consumer id exists there
+        val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)
+        if(partitionOwner == null) {
+          error("No owner for topic %s partition %s".format(topic, partition))
+          rebalanceSucceeded = false
+        }
+        else {
+          // check if the owner is a valid consumer id
+          consumerIdsForTopic match {
+            case Some(consumerIds) =>
+              if(!consumerIds.contains(partitionOwner)) {
+                error("Owner %s for topic %s partition %s is not a valid member of consumer " +
+                  "group %s".format(partitionOwner, topic, partition, group))
+                rebalanceSucceeded = false
+              }
+              else
+                info("Owner of topic %s partition %s is %s".format(topic, partition, partitionOwner))
+            case None => {
+              error("No consumer ids registered for topic " + topic)
+              rebalanceSucceeded = false
+            }
+          }
+        }
+      }
+
+    }
+
+    rebalanceSucceeded
+  }
+
+
+}
\ No newline at end of file

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Thu Feb  2 19:07:48 2012
@@ -19,7 +19,6 @@ package kafka.utils
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.utils._
 
 /**
  * A scheduler for running jobs in the background

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala Thu Feb  2 19:07:48 2012
@@ -17,9 +17,8 @@
 
 package kafka.utils
 
-import scala.math._
 
-/** 
+/**
  * A generic range value with a start and end 
  */
 trait Range {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Thu Feb  2 19:07:48 2012
@@ -21,6 +21,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{SimpleConsumer, ConsumerConfig}
 import kafka.cluster.Partition
 import kafka.api.OffsetRequest
+import java.lang.IllegalStateException
 
 /**
  *  A utility that updates the offset of every broker partition to the offset of latest log segment file, in ZK.
@@ -55,7 +56,11 @@ object UpdateOffsetsInZK {
     var numParts = 0
     for (partString <- partitions) {
       val part = Partition.parse(partString)
-      val broker = cluster.getBroker(part.brokerId)
+      val broker = cluster.getBroker(part.brokerId) match {
+        case Some(b) => b
+        case None => throw new IllegalStateException("Broker " + part.brokerId + " is unavailable. Cannot issue " +
+          "getOffsetsBefore request")
+      }
       val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100 * 1024)
       val offsets = consumer.getOffsetsBefore(topic, part.partId, offsetOption, 1)
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Thu Feb  2 19:07:48 2012
@@ -23,7 +23,6 @@ import java.nio.channels._
 import java.util.concurrent.atomic._
 import java.lang.management._
 import java.util.zip.CRC32
-import org.apache.log4j.Logger
 import javax.management._
 import java.util.Properties
 import scala.collection._

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Feb  2 19:07:48 2012
@@ -23,6 +23,7 @@ import kafka.cluster.{Broker, Cluster}
 import scala.collection._
 import java.util.Properties
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import kafka.consumer.TopicCount
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -236,6 +237,44 @@ object ZkUtils extends Logging {
     val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
     zkClient.delete(brokerPartTopicPath)
   }
+
+  def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
+    val dirs = new ZKGroupDirs(group)
+    getChildren(zkClient, dirs.consumerRegistryDir)
+  }
+
+  def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = {
+    val dirs = new ZKGroupDirs(group)
+    val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+    TopicCount.constructTopicCount(consumerId, topicCountJson)
+  }
+
+  def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
+    val dirs = new ZKGroupDirs(group)
+    val consumersInGroup = getConsumersInGroup(zkClient, group)
+    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)))
+    consumersInGroup.zip(topicCountMaps).toMap
+  }
+
+  def getConsumersPerTopic(zkClient: ZkClient, group: String) : mutable.Map[String, List[String]] = {
+    val dirs = new ZKGroupDirs(group)
+    val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
+    val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+    for (consumer <- consumers) {
+      val topicCount = getTopicCount(zkClient, group, consumer)
+      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+        for (consumerThreadId <- consumerThreadIdSet)
+          consumersPerTopicMap.get(topic) match {
+            case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
+            case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
+          }
+      }
+    }
+    for ( (topic, consumerList) <- consumersPerTopicMap )
+      consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
+    consumersPerTopicMap
+  }
 }
 
 object ZKStringSerializer extends ZkSerializer {

Modified: incubator/kafka/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/log4j.properties?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/trunk/core/src/test/resources/log4j.properties Thu Feb  2 19:07:48 2012
@@ -21,4 +21,5 @@ log4j.appender.stdout.layout.ConversionP
 log4j.logger.kafka=WARN
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
\ No newline at end of file
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN

Modified: incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh Thu Feb  2 19:07:48 2012
@@ -68,7 +68,7 @@ readonly test_start_time="$(date +%s)"
 
 readonly num_msg_per_batch=500
 readonly batches_per_iteration=5
-readonly num_iterations=10
+readonly num_iterations=12
 
 readonly zk_source_port=2181
 readonly zk_mirror_port=2182
@@ -132,6 +132,8 @@ producer_performance_crc_log=$base_dir/p
 producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log
 producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log
 
+consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log
+
 consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
 checksum_diff_log=$base_dir/checksum_diff.log
 
@@ -173,6 +175,17 @@ get_random_range() {
     return $(($(($RANDOM % range)) + $lo))
 }
 
+verify_consumer_rebalancing() {
+
+   info "Verifying consumer rebalancing operation"
+
+    $base_dir/bin/kafka-run-class.sh \
+        kafka.tools.VerifyConsumerRebalance \
+        --zk.connect=localhost:2181 \
+        --group $consumer_grp \
+     2>&1 >> $consumer_rebalancing_log
+}
+
 wait_for_zero_consumer_lags() {
 
     # no of times to check for zero lagging
@@ -618,6 +631,7 @@ start_test() {
                     sleep $wait_time_after_restarting_broker
                 fi
             fi
+            verify_consumer_rebalancing
         else
             info "No bouncing performed"
         fi
@@ -662,6 +676,8 @@ start_console_consumer_for_mirror_produc
 wait_for_zero_source_console_consumer_lags
 wait_for_zero_mirror_console_consumer_lags
 
+verify_consumer_rebalancing
+
 shutdown_servers
 
 cmp_checksum

Modified: incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties Thu Feb  2 19:07:48 2012
@@ -26,13 +26,15 @@ log4j.appender.stdout.layout.ConversionP
 
 # Turn on all our debugging info
 #log4j.logger.kafka=INFO
-#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
+log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.kafka.consumer=DEBUG
 log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
 log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
 #log4j.logger.kafka.producer.async.AsyncProducer=TRACE
 #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE
 log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
 
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG