You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/01/22 02:26:26 UTC
kafka git commit: KAFKA-1848;
check consumer shutting down flag inside retry loop; reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk fcc252185 -> a0852d477
KAFKA-1848; check consumer shutting down flag inside retry loop; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0852d47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0852d47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0852d47
Branch: refs/heads/trunk
Commit: a0852d477ec097bd4efe120c21de020fccf6dc49
Parents: fcc2521
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Wed Jan 21 17:25:54 2015 -0800
Committer: Guozhang Wang <gu...@linkedin.com>
Committed: Wed Jan 21 17:25:54 2015 -0800
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 57 ++++++++++----------
1 file changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a0852d47/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 191a867..5487259 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -613,36 +613,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def syncedRebalance() {
rebalanceLock synchronized {
rebalanceTimer.time {
- if(isShuttingDown.get()) {
- return
- } else {
- for (i <- 0 until config.rebalanceMaxRetries) {
- info("begin rebalancing consumer " + consumerIdString + " try #" + i)
- var done = false
- var cluster: Cluster = null
- try {
- cluster = getCluster(zkClient)
- done = rebalance(cluster)
- } catch {
- case e: Throwable =>
- /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
- * For example, a ZK node can disappear between the time we get all children and the time we try to get
- * the value of a child. Just let this go since another rebalance will be triggered.
- **/
- info("exception during rebalance ", e)
- }
- info("end rebalancing consumer " + consumerIdString + " try #" + i)
- if (done) {
- return
- } else {
- /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
- * clear the cache */
- info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
- }
- // stop all fetchers and clear all the queues to avoid data duplication
- closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
- Thread.sleep(config.rebalanceBackoffMs)
+ for (i <- 0 until config.rebalanceMaxRetries) {
+ if(isShuttingDown.get()) {
+ return
+ }
+ info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+ var done = false
+ var cluster: Cluster = null
+ try {
+ cluster = getCluster(zkClient)
+ done = rebalance(cluster)
+ } catch {
+ case e: Throwable =>
+ /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+ * For example, a ZK node can disappear between the time we get all children and the time we try to get
+ * the value of a child. Just let this go since another rebalance will be triggered.
+ **/
+ info("exception during rebalance ", e)
+ }
+ info("end rebalancing consumer " + consumerIdString + " try #" + i)
+ if (done) {
+ return
+ } else {
+ /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
+ * clear the cache */
+ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
}
+ // stop all fetchers and clear all the queues to avoid data duplication
+ closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
+ Thread.sleep(config.rebalanceBackoffMs)
}
}
}