You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/01/22 02:26:26 UTC

kafka git commit: KAFKA-1848; check consumer shutting down flag inside retry loop; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk fcc252185 -> a0852d477


KAFKA-1848; check consumer shutting down flag inside retry loop; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0852d47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0852d47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0852d47

Branch: refs/heads/trunk
Commit: a0852d477ec097bd4efe120c21de020fccf6dc49
Parents: fcc2521
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Wed Jan 21 17:25:54 2015 -0800
Committer: Guozhang Wang <gu...@linkedin.com>
Committed: Wed Jan 21 17:25:54 2015 -0800

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 57 ++++++++++----------
 1 file changed, 28 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a0852d47/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 191a867..5487259 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -613,36 +613,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     def syncedRebalance() {
       rebalanceLock synchronized {
         rebalanceTimer.time {
-          if(isShuttingDown.get())  {
-            return
-          } else {
-            for (i <- 0 until config.rebalanceMaxRetries) {
-              info("begin rebalancing consumer " + consumerIdString + " try #" + i)
-              var done = false
-              var cluster: Cluster = null
-              try {
-                cluster = getCluster(zkClient)
-                done = rebalance(cluster)
-              } catch {
-                case e: Throwable =>
-                  /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
-                    * For example, a ZK node can disappear between the time we get all children and the time we try to get
-                    * the value of a child. Just let this go since another rebalance will be triggered.
-                    **/
-                  info("exception during rebalance ", e)
-              }
-              info("end rebalancing consumer " + consumerIdString + " try #" + i)
-              if (done) {
-                return
-              } else {
-                /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
-                 * clear the cache */
-                info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
-              }
-              // stop all fetchers and clear all the queues to avoid data duplication
-              closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
-              Thread.sleep(config.rebalanceBackoffMs)
+          for (i <- 0 until config.rebalanceMaxRetries) {
+            if(isShuttingDown.get())  {
+              return
+            }
+            info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+            var done = false
+            var cluster: Cluster = null
+            try {
+              cluster = getCluster(zkClient)
+              done = rebalance(cluster)
+            } catch {
+              case e: Throwable =>
+                /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+                  * For example, a ZK node can disappear between the time we get all children and the time we try to get
+                  * the value of a child. Just let this go since another rebalance will be triggered.
+                  **/
+                info("exception during rebalance ", e)
+            }
+            info("end rebalancing consumer " + consumerIdString + " try #" + i)
+            if (done) {
+              return
+            } else {
+              /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
+               * clear the cache */
+              info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
             }
+            // stop all fetchers and clear all the queues to avoid data duplication
+            closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
+            Thread.sleep(config.rebalanceBackoffMs)
           }
         }
       }