You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:10 UTC

[25/36] git commit: kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer; patched by Sam Meder; reviewed by Jun Rao

kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer; patched by Sam Meder; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a9faa49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a9faa49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a9faa49

Branch: refs/heads/trunk
Commit: 7a9faa49ed5c7581cb2bd6c86b68df06a8879fec
Parents: 1db824e
Author: Sam Meder <sa...@gmail.com>
Authored: Fri Aug 16 10:13:30 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 16 10:13:30 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala    | 3 ++-
 core/src/main/scala/kafka/tools/ImportZkOffsets.scala             | 3 ---
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..c2b9b9a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -399,8 +399,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           for (i <- 0 until config.rebalanceMaxRetries) {
             info("begin rebalancing consumer " + consumerIdString + " try #" + i)
             var done = false
-            val cluster = getCluster(zkClient)
+            var cluster: Cluster = null
             try {
+              cluster = getCluster(zkClient)
               done = rebalance(cluster)
             } catch {
               case e =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 63519e1..55709b5 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -96,9 +96,6 @@ object ImportZkOffsets extends Logging {
   }
   
   private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
-    val cluster = ZkUtils.getCluster(zkClient)
-    var partitions: List[String] = Nil
-
     for ((partition, offset) <- partitionOffsets) {
       debug("updating [" + partition + "] with offset [" + offset + "]")