You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:04 UTC
[19/36] git commit: kafka-989;
Race condition shutting down high-level consumer results in spinning
background thread; patched by Phil Hargett; reviewed by Jun Rao
kafka-989; Race condition shutting down high-level consumer results in spinning background thread; patched by Phil Hargett; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d6ad3d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d6ad3d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d6ad3d7
Branch: refs/heads/trunk
Commit: 1d6ad3d7d441bc7174a445a4e5e49319ee1003eb
Parents: 7fbbb66
Author: Phil Hargett <ph...@mirror-image.com>
Authored: Mon Aug 5 18:59:34 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Aug 5 18:59:34 2013 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 94 +++++++++++---------
1 file changed, 50 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d6ad3d7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a2ea5a9..0ca2850 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -153,32 +153,34 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
def shutdown() {
- val canShutdown = isShuttingDown.compareAndSet(false, true);
- if (canShutdown) {
- info("ZKConsumerConnector shutting down")
+ rebalanceLock synchronized {
+ val canShutdown = isShuttingDown.compareAndSet(false, true);
+ if (canShutdown) {
+ info("ZKConsumerConnector shutting down")
- if (wildcardTopicWatcher != null)
- wildcardTopicWatcher.shutdown()
- try {
- if (config.autoCommitEnable)
- scheduler.shutdownNow()
- fetcher match {
- case Some(f) => f.stopConnections
- case None =>
- }
- sendShutdownToAllQueues()
- if (config.autoCommitEnable)
- commitOffsets()
- if (zkClient != null) {
- zkClient.close()
- zkClient = null
+ if (wildcardTopicWatcher != null)
+ wildcardTopicWatcher.shutdown()
+ try {
+ if (config.autoCommitEnable)
+ scheduler.shutdownNow()
+ fetcher match {
+ case Some(f) => f.stopConnections
+ case None =>
+ }
+ sendShutdownToAllQueues()
+ if (config.autoCommitEnable)
+ commitOffsets()
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+ } catch {
+ case e =>
+ fatal("error during consumer connector shutdown", e)
}
- } catch {
- case e =>
- fatal("error during consumer connector shutdown", e)
+ info("ZKConsumerConnector shut down completed")
}
- info("ZKConsumerConnector shut down completed")
- }
+ }
}
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
@@ -369,31 +371,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def syncedRebalance() {
rebalanceLock synchronized {
- for (i <- 0 until config.rebalanceMaxRetries) {
- info("begin rebalancing consumer " + consumerIdString + " try #" + i)
- var done = false
- val cluster = getCluster(zkClient)
- try {
- done = rebalance(cluster)
- } catch {
- case e =>
- /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
- * For example, a ZK node can disappear between the time we get all children and the time we try to get
- * the value of a child. Just let this go since another rebalance will be triggered.
- **/
- info("exception during rebalance ", e)
- }
- info("end rebalancing consumer " + consumerIdString + " try #" + i)
- if (done) {
- return
- } else {
+ if(isShuttingDown.get()) {
+ return
+ } else {
+ for (i <- 0 until config.rebalanceMaxRetries) {
+ info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+ var done = false
+ val cluster = getCluster(zkClient)
+ try {
+ done = rebalance(cluster)
+ } catch {
+ case e =>
+ /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+ * For example, a ZK node can disappear between the time we get all children and the time we try to get
+ * the value of a child. Just let this go since another rebalance will be triggered.
+ **/
+ info("exception during rebalance ", e)
+ }
+ info("end rebalancing consumer " + consumerIdString + " try #" + i)
+ if (done) {
+ return
+ } else {
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
+ }
+ // stop all fetchers and clear all the queues to avoid data duplication
+ closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
+ Thread.sleep(config.rebalanceBackoffMs)
}
- // stop all fetchers and clear all the queues to avoid data duplication
- closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
- Thread.sleep(config.rebalanceBackoffMs)
}
}