You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/17 03:14:25 UTC

svn commit: r1245295 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Author: junrao
Date: Fri Feb 17 02:14:25 2012
New Revision: 1245295

URL: http://svn.apache.org/viewvc?rev=1245295&view=rev
Log:
shutdown watch executor thread properly; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-265

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1245295&r1=1245294&r2=1245295&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Fri Feb 17 02:14:25 2012
@@ -379,17 +379,20 @@ private[kafka] class ZookeeperConsumerCo
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
         info("starting watcher executor thread for consumer " + consumerIdString)
+        var doRebalance = false
         while (!isShuttingDown.get) {
           try {
             lock.lock()
             try {
               if (!isWatcherTriggered)
-                cond.await()
+                cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
             } finally {
+              doRebalance = isWatcherTriggered
               isWatcherTriggered = false
               lock.unlock()
             }
-            syncedRebalance
+            if (doRebalance)
+              syncedRebalance
           } catch {
             case t => error("error during syncedRebalance", t)
           }