You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/04 05:51:04 UTC

git commit: kafka-937; delta patch; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy

Updated Branches:
  refs/heads/0.8 f89ddced1 -> 20953b525


kafka-937; delta patch; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20953b52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20953b52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20953b52

Branch: refs/heads/0.8
Commit: 20953b52558935ba210eaee18e9504bf16bfec27
Parents: f89ddce
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Sep 3 20:50:45 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Sep 3 20:50:45 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala      | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/20953b52/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b286312..fa6b213 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -92,7 +92,20 @@ class ConsumerFetcherManager(private val consumerIdString: String,
       leaderForPartitionsMap.foreach {
         case(topicAndPartition, leaderBroker) =>
           val pti = partitionMap(topicAndPartition)
-          addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+          try {
+            addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+          } catch {
+            case t => {
+                if (!isRunning.get())
+                  throw t /* If this thread is stopped, propagate this exception to kill the thread. */
+                else {
+                  warn("Failed to add leader for partition %s; will retry".format(topicAndPartition), t)
+                  lock.lock()
+                  noLeaderPartitionSet += topicAndPartition
+                  lock.unlock()
+                }
+              }
+          }
       }
 
       shutdownIdleFetcherThreads()