You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/27 20:46:36 UTC

svn commit: r1294302 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Author: junrao
Date: Mon Feb 27 19:46:36 2012
New Revision: 1294302

URL: http://svn.apache.org/viewvc?rev=1294302&view=rev
Log:
consumer sometimes don't release partition ownership properly in ZK during rebalance; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-286

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1294302&r1=1294301&r2=1294302&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Feb 27 19:46:36 2012
@@ -413,16 +413,19 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def releasePartitionOwnership()= {
+    private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
+      val topicDirs = new ZKGroupTopicDirs(group, topic)
+      val znode = topicDirs.consumerOwnerDir + "/" + partition
+      deletePath(zkClient, znode)
+      debug("Consumer " + consumerIdString + " releasing " + znode)
+    }
+
+    private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]])= {
       info("Releasing partition ownership")
-      for ((topic, infos) <- topicRegistry) {
-        val topicDirs = new ZKGroupTopicDirs(group, topic)
-        for(partition <- infos.keys) {
-          val znode = topicDirs.consumerOwnerDir + "/" + partition
-          deletePath(zkClient, znode)
-          debug("Consumer " + consumerIdString + " releasing " + znode)
-        }
-        topicRegistry.remove(topic)
+      for ((topic, infos) <- localTopicRegistry) {
+        for(partition <- infos.keys)
+          deletePartitionOwnershipFromZK(topic, partition.toString)
+        localTopicRegistry.remove(topic)
       }
     }
 
@@ -446,8 +449,6 @@ private[kafka] class ZookeeperConsumerCo
                * the value of a child. Just let this go since another rebalance will be triggered.
                **/
               info("exception during rebalance ", e)
-              /* Explicitly make sure another rebalancing attempt will get triggered. */
-              done = false
           }
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
           if (done) {
@@ -459,8 +460,6 @@ private[kafka] class ZookeeperConsumerCo
           }
           // stop all fetchers and clear all the queues to avoid data duplication
           closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
-          // release all partitions, reset state and retry
-          releasePartitionOwnership()
           Thread.sleep(config.rebalanceBackoffMs)
         }
       }
@@ -481,7 +480,7 @@ private[kafka] class ZookeeperConsumerCo
        */
       closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
 
-      releasePartitionOwnership()
+      releasePartitionOwnership(topicRegistry)
 
       var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
       var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
@@ -534,8 +533,9 @@ private[kafka] class ZookeeperConsumerCo
         topicRegistry = currentTopicRegistry
         updateFetcher(cluster, kafkaMessageStreams)
         true
-      }else
+      }else {
         false
+      }
     }
 
     private def closeFetchersForQueues(cluster: Cluster,
@@ -585,6 +585,7 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+      var successfullyOwnedPartitions : List[(String, String)] = Nil
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
         val topic = partitionOwner._1._1
         val partition = partitionOwner._1._2
@@ -594,6 +595,7 @@ private[kafka] class ZookeeperConsumerCo
         try {
           createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+          successfullyOwnedPartitions ::= (topic, partition)
           true
         }
         catch {
@@ -606,7 +608,11 @@ private[kafka] class ZookeeperConsumerCo
       }
       val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
       /* even if one of the partition ownership attempt has failed, return false */
-      if(hasPartitionOwnershipFailed > 0) false
+      if(hasPartitionOwnershipFailed > 0) {
+        // remove all paths that we have owned in ZK
+        successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2))
+        false
+      }
       else true
     }