You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/29 00:35:01 UTC

git commit: kafka-832; 0.8 Consumer prevents rebalance if consumer thread is blocked or slow; patched by Jun Rao; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 1d5e95f6c -> 6e05d7da8


kafka-832; 0.8 Consumer prevents rebalance if consumer thread is blocked or slow; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6e05d7da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6e05d7da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6e05d7da

Branch: refs/heads/0.8
Commit: 6e05d7da865898c1c44f552c735484dfe2603b60
Parents: 1d5e95f
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Mar 28 16:34:47 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 28 16:34:47 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala    |   39 +++++++--------
 .../consumer/ZookeeperConsumerConnector.scala      |    4 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   15 +-----
 .../scala/unit/kafka/integration/FetcherTest.scala |    4 +-
 4 files changed, 25 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 3aa7b08..d1373c9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -43,13 +43,17 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
-  private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){
+  private var leaderFinderThread: ShutdownableThread = null
+
+  private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
     override def doWork() {
       lock.lock()
       try {
-        if (noLeaderPartitionSet.isEmpty)
+        if (noLeaderPartitionSet.isEmpty) {
+          trace("No partition for leader election.")
           cond.await()
+        }
 
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
@@ -93,8 +97,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
       Thread.sleep(config.refreshLeaderBackoffMs)
     }
   }
-  leaderFinderThread.start()
-
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ConsumerFetcherThread(
@@ -103,8 +105,9 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
-    if (!leaderFinderThread.isRunning.get())
-      throw new RuntimeException("%s already shutdown".format(name))
+    leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
+    leaderFinderThread.start()
+
     lock.lock()
     try {
       partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
@@ -116,16 +119,17 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     }
   }
 
-  def stopAllConnections() {
-    lock.lock()
-    // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
-    noLeaderPartitionSet.clear()
-    // second, clear partitionMap
-    partitionMap = null
-    lock.unlock()
+  def stopConnections() {
+    info("Stopping leader finder thread")
+    if (leaderFinderThread != null) {
+      leaderFinderThread.shutdown()
+      leaderFinderThread = null
+    }
 
-    // third, stop all existing fetchers
+    info("Stopping all fetchers")
     closeAllFetchers()
+
+    info("All connections stopped")
   }
 
   def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
@@ -141,11 +145,4 @@ class ConsumerFetcherManager(private val consumerIdString: String,
       lock.unlock()
     }
   }
-
-  def shutdown() {
-    info("shutting down")
-    leaderFinderThread.shutdown()
-    stopAllConnections()
-    info("shutdown completed")
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index dcbcf21..9a5fbfe 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -162,7 +162,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         if (config.autoCommitEnable)
           scheduler.shutdownNow()
         fetcher match {
-          case Some(f) => f.shutdown
+          case Some(f) => f.stopConnections
           case None =>
         }
         sendShutdownToAllQueues()
@@ -483,7 +483,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) =>
-          f.stopAllConnections
+          f.stopConnections
           clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
           info("Committing all offsets after clearing the fetcher queues")
           /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 4ee23cd..087979f 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -159,7 +159,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
-    partitionMapLock.lock()
+    partitionMapLock.lockInterruptibly()
     try {
       val topicPartition = TopicAndPartition(topic, partitionId)
       partitionMap.put(
@@ -172,7 +172,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   def removePartition(topic: String, partitionId: Int) {
-    partitionMapLock.lock()
+    partitionMapLock.lockInterruptibly()
     try {
       partitionMap.remove(TopicAndPartition(topic, partitionId))
     } finally {
@@ -180,17 +180,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     }
   }
 
-  def hasPartition(topic: String, partitionId: Int): Boolean = {
-    partitionMapLock.lock()
-    try {
-      partitionMap.get(TopicAndPartition(topic, partitionId)).isDefined
-    } finally {
-      partitionMapLock.unlock()
-    }
-  }
-
   def partitionCount() = {
-    partitionMapLock.lock()
+    partitionMapLock.lockInterruptibly()
     try {
       partitionMap.size
     } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 5a57bd1..c5cddea 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -58,12 +58,12 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
-    fetcher.stopAllConnections()
+    fetcher.stopConnections()
     fetcher.startConnections(topicInfos, cluster)
   }
 
   override def tearDown() {
-    fetcher.shutdown()
+    fetcher.stopConnections()
     super.tearDown
   }