You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:04 UTC

[19/36] git commit: kafka-989; Race condition shutting down high-level consumer results in spinning background thread; patched by Phil Hargett; reviewed by Jun Rao

kafka-989; Race condition shutting down high-level consumer results in spinning background thread; patched by Phil Hargett; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d6ad3d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d6ad3d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d6ad3d7

Branch: refs/heads/trunk
Commit: 1d6ad3d7d441bc7174a445a4e5e49319ee1003eb
Parents: 7fbbb66
Author: Phil Hargett <ph...@mirror-image.com>
Authored: Mon Aug 5 18:59:34 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Aug 5 18:59:34 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 94 +++++++++++---------
 1 file changed, 50 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d6ad3d7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a2ea5a9..0ca2850 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -153,32 +153,34 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   def shutdown() {
-    val canShutdown = isShuttingDown.compareAndSet(false, true);
-    if (canShutdown) {
-      info("ZKConsumerConnector shutting down")
+    rebalanceLock synchronized {
+      val canShutdown = isShuttingDown.compareAndSet(false, true);
+      if (canShutdown) {
+        info("ZKConsumerConnector shutting down")
 
-      if (wildcardTopicWatcher != null)
-        wildcardTopicWatcher.shutdown()
-      try {
-        if (config.autoCommitEnable)
-          scheduler.shutdownNow()
-        fetcher match {
-          case Some(f) => f.stopConnections
-          case None =>
-        }
-        sendShutdownToAllQueues()
-        if (config.autoCommitEnable)
-          commitOffsets()
-        if (zkClient != null) {
-          zkClient.close()
-          zkClient = null
+        if (wildcardTopicWatcher != null)
+          wildcardTopicWatcher.shutdown()
+        try {
+          if (config.autoCommitEnable)
+	        scheduler.shutdownNow()
+          fetcher match {
+            case Some(f) => f.stopConnections
+            case None =>
+          }
+          sendShutdownToAllQueues()
+          if (config.autoCommitEnable)
+            commitOffsets()
+          if (zkClient != null) {
+            zkClient.close()
+            zkClient = null
+          }
+        } catch {
+          case e =>
+            fatal("error during consumer connector shutdown", e)
         }
-      } catch {
-        case e =>
-          fatal("error during consumer connector shutdown", e)
+        info("ZKConsumerConnector shut down completed")
       }
-      info("ZKConsumerConnector shut down completed")
-    }
+	}
   }
 
   def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
@@ -369,31 +371,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     def syncedRebalance() {
       rebalanceLock synchronized {
-        for (i <- 0 until config.rebalanceMaxRetries) {
-          info("begin rebalancing consumer " + consumerIdString + " try #" + i)
-          var done = false
-          val cluster = getCluster(zkClient)
-          try {
-            done = rebalance(cluster)
-          } catch {
-            case e =>
-              /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
-               * For example, a ZK node can disappear between the time we get all children and the time we try to get
-               * the value of a child. Just let this go since another rebalance will be triggered.
-               **/
-              info("exception during rebalance ", e)
-          }
-          info("end rebalancing consumer " + consumerIdString + " try #" + i)
-          if (done) {
-            return
-          } else {
+        if(isShuttingDown.get())  {
+          return
+        } else {
+          for (i <- 0 until config.rebalanceMaxRetries) {
+            info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+            var done = false
+            val cluster = getCluster(zkClient)
+            try {
+              done = rebalance(cluster)
+            } catch {
+              case e =>
+                /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+                 * For example, a ZK node can disappear between the time we get all children and the time we try to get
+                 * the value of a child. Just let this go since another rebalance will be triggered.
+                 **/
+                info("exception during rebalance ", e)
+            }
+            info("end rebalancing consumer " + consumerIdString + " try #" + i)
+            if (done) {
+              return
+            } else {
               /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                * clear the cache */
               info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
+            }
+            // stop all fetchers and clear all the queues to avoid data duplication
+            closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
+            Thread.sleep(config.rebalanceBackoffMs)
           }
-          // stop all fetchers and clear all the queues to avoid data duplication
-          closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
-          Thread.sleep(config.rebalanceBackoffMs)
         }
       }