You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/17 03:14:25 UTC
svn commit: r1245295 -
/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Author: junrao
Date: Fri Feb 17 02:14:25 2012
New Revision: 1245295
URL: http://svn.apache.org/viewvc?rev=1245295&view=rev
Log:
shutdown watch executor thread properly; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-265
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1245295&r1=1245294&r2=1245295&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Fri Feb 17 02:14:25 2012
@@ -379,17 +379,20 @@ private[kafka] class ZookeeperConsumerCo
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
info("starting watcher executor thread for consumer " + consumerIdString)
+ var doRebalance = false
while (!isShuttingDown.get) {
try {
lock.lock()
try {
if (!isWatcherTriggered)
- cond.await()
+ cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
} finally {
+ doRebalance = isWatcherTriggered
isWatcherTriggered = false
lock.unlock()
}
- syncedRebalance
+ if (doRebalance)
+ syncedRebalance
} catch {
case t => error("error during syncedRebalance", t)
}