You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joe Stein <cr...@gmail.com> on 2012/05/25 03:04:45 UTC
releasePartitionOwnership conflict
just about done with the trunk merge from 0.8 but ran into something here
private def deletePartitionOwnershipFromZK(topic: String, partition:
String) {
val topicDirs = new ZKGroupTopicDirs(group, topic)
val znode = topicDirs.consumerOwnerDir + "/" + partition
deletePath(zkClient, znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
private def releasePartitionOwnership(localTopicRegistry: Pool[String,
Pool[Partition, PartitionTopicInfo]])= {
info("Releasing partition ownership")
<<<<<<< .working
for ((topic, infos) <- topicRegistry) {
for(partition <- infos.keys) {
val partitionOwnerPath = getConsumerPartitionOwnerPath(group,
topic, partition.toString)
deletePath(zkClient, partitionOwnerPath)
debug("Consumer " + consumerIdString + " releasing " +
partitionOwnerPath)
}
=======
for ((topic, infos) <- localTopicRegistry) {
for(partition <- infos.keys)
deletePartitionOwnershipFromZK(topic, partition.toString)
localTopicRegistry.remove(topic)
>>>>>>> .merge-right.r1342339
}
}
looks like between KAFKA-300, 239 and 286
not sure what we want to-do here, thoughts?
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/