You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/27 20:46:36 UTC
svn commit: r1294302 -
/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Author: junrao
Date: Mon Feb 27 19:46:36 2012
New Revision: 1294302
URL: http://svn.apache.org/viewvc?rev=1294302&view=rev
Log:
consumer sometimes don't release partition ownership properly in ZK during rebalance; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-286
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1294302&r1=1294301&r2=1294302&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Feb 27 19:46:36 2012
@@ -413,16 +413,19 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def releasePartitionOwnership()= {
+ private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
+ val topicDirs = new ZKGroupTopicDirs(group, topic)
+ val znode = topicDirs.consumerOwnerDir + "/" + partition
+ deletePath(zkClient, znode)
+ debug("Consumer " + consumerIdString + " releasing " + znode)
+ }
+
+ private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]])= {
info("Releasing partition ownership")
- for ((topic, infos) <- topicRegistry) {
- val topicDirs = new ZKGroupTopicDirs(group, topic)
- for(partition <- infos.keys) {
- val znode = topicDirs.consumerOwnerDir + "/" + partition
- deletePath(zkClient, znode)
- debug("Consumer " + consumerIdString + " releasing " + znode)
- }
- topicRegistry.remove(topic)
+ for ((topic, infos) <- localTopicRegistry) {
+ for(partition <- infos.keys)
+ deletePartitionOwnershipFromZK(topic, partition.toString)
+ localTopicRegistry.remove(topic)
}
}
@@ -446,8 +449,6 @@ private[kafka] class ZookeeperConsumerCo
* the value of a child. Just let this go since another rebalance will be triggered.
**/
info("exception during rebalance ", e)
- /* Explicitly make sure another rebalancing attempt will get triggered. */
- done = false
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
@@ -459,8 +460,6 @@ private[kafka] class ZookeeperConsumerCo
}
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
- // release all partitions, reset state and retry
- releasePartitionOwnership()
Thread.sleep(config.rebalanceBackoffMs)
}
}
@@ -481,7 +480,7 @@ private[kafka] class ZookeeperConsumerCo
*/
closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
- releasePartitionOwnership()
+ releasePartitionOwnership(topicRegistry)
var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
@@ -534,8 +533,9 @@ private[kafka] class ZookeeperConsumerCo
topicRegistry = currentTopicRegistry
updateFetcher(cluster, kafkaMessageStreams)
true
- }else
+ }else {
false
+ }
}
private def closeFetchersForQueues(cluster: Cluster,
@@ -585,6 +585,7 @@ private[kafka] class ZookeeperConsumerCo
}
private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+ var successfullyOwnedPartitions : List[(String, String)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1._1
val partition = partitionOwner._1._2
@@ -594,6 +595,7 @@ private[kafka] class ZookeeperConsumerCo
try {
createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+ successfullyOwnedPartitions ::= (topic, partition)
true
}
catch {
@@ -606,7 +608,11 @@ private[kafka] class ZookeeperConsumerCo
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
/* even if one of the partition ownership attempt has failed, return false */
- if(hasPartitionOwnershipFailed > 0) false
+ if(hasPartitionOwnershipFailed > 0) {
+ // remove all paths that we have owned in ZK
+ successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2))
+ false
+ }
else true
}