You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:10 UTC
[25/36] git commit: kafka-1010;
Concurrency issue in getCluster() causes rebalance failure and dead
consumer; patched by Sam Meder; reviewed by Jun Rao
kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer; patched by Sam Meder; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a9faa49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a9faa49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a9faa49
Branch: refs/heads/trunk
Commit: 7a9faa49ed5c7581cb2bd6c86b68df06a8879fec
Parents: 1db824e
Author: Sam Meder <sa...@gmail.com>
Authored: Fri Aug 16 10:13:30 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 16 10:13:30 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 3 ++-
core/src/main/scala/kafka/tools/ImportZkOffsets.scala | 3 ---
2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..c2b9b9a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -399,8 +399,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- 0 until config.rebalanceMaxRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
- val cluster = getCluster(zkClient)
+ var cluster: Cluster = null
try {
+ cluster = getCluster(zkClient)
done = rebalance(cluster)
} catch {
case e =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 63519e1..55709b5 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -96,9 +96,6 @@ object ImportZkOffsets extends Logging {
}
private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
- val cluster = ZkUtils.getCluster(zkClient)
- var partitions: List[String] = Nil
-
for ((partition, offset) <- partitionOffsets) {
debug("updating [" + partition + "] with offset [" + offset + "]")