You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joe Stein <cr...@gmail.com> on 2012/05/25 03:04:45 UTC

releasePartitionOwnership conflict

just about done with the trunk merge from 0.8 but ran into something here


    private def deletePartitionOwnershipFromZK(topic: String, partition:
String) {
      val topicDirs = new ZKGroupTopicDirs(group, topic)
      val znode = topicDirs.consumerOwnerDir + "/" + partition
      deletePath(zkClient, znode)
      debug("Consumer " + consumerIdString + " releasing " + znode)
    }

    private def releasePartitionOwnership(localTopicRegistry: Pool[String,
Pool[Partition, PartitionTopicInfo]])= {
      info("Releasing partition ownership")
<<<<<<< .working
      for ((topic, infos) <- topicRegistry) {
        for(partition <- infos.keys) {
          val partitionOwnerPath = getConsumerPartitionOwnerPath(group,
topic, partition.toString)
          deletePath(zkClient, partitionOwnerPath)
          debug("Consumer " + consumerIdString + " releasing " +
partitionOwnerPath)
        }
=======
      for ((topic, infos) <- localTopicRegistry) {
        for(partition <- infos.keys)
          deletePartitionOwnershipFromZK(topic, partition.toString)
        localTopicRegistry.remove(topic)
>>>>>>> .merge-right.r1342339
      }
    }

looks like between KAFKA-300, 239 and 286

not sure what we want to-do here, thoughts?

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/