You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/04 05:51:04 UTC
git commit: kafka-937; delta patch; ConsumerFetcherThread can deadlock;
patched by Jun Rao; reviewed by Joel Koshy
Updated Branches:
refs/heads/0.8 f89ddced1 -> 20953b525
kafka-937; delta patch; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20953b52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20953b52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20953b52
Branch: refs/heads/0.8
Commit: 20953b52558935ba210eaee18e9504bf16bfec27
Parents: f89ddce
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Sep 3 20:50:45 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Sep 3 20:50:45 2013 -0700
----------------------------------------------------------------------
.../kafka/consumer/ConsumerFetcherManager.scala | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/20953b52/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b286312..fa6b213 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -92,7 +92,20 @@ class ConsumerFetcherManager(private val consumerIdString: String,
leaderForPartitionsMap.foreach {
case(topicAndPartition, leaderBroker) =>
val pti = partitionMap(topicAndPartition)
- addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+ try {
+ addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+ } catch {
+ case t => {
+ if (!isRunning.get())
+ throw t /* If this thread is stopped, propagate this exception to kill the thread. */
+ else {
+ warn("Failed to add leader for partition %s; will retry".format(topicAndPartition), t)
+ lock.lock()
+ noLeaderPartitionSet += topicAndPartition
+ lock.unlock()
+ }
+ }
+ }
}
shutdownIdleFetcherThreads()