You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/29 00:35:01 UTC
git commit: kafka-832;
0.8 Consumer prevents rebalance if consumer thread is blocked or slow;
patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 1d5e95f6c -> 6e05d7da8
kafka-832; 0.8 Consumer prevents rebalance if consumer thread is blocked or slow; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6e05d7da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6e05d7da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6e05d7da
Branch: refs/heads/0.8
Commit: 6e05d7da865898c1c44f552c735484dfe2603b60
Parents: 1d5e95f
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Mar 28 16:34:47 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 28 16:34:47 2013 -0700
----------------------------------------------------------------------
.../kafka/consumer/ConsumerFetcherManager.scala | 39 +++++++--------
.../consumer/ZookeeperConsumerConnector.scala | 4 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 15 +-----
.../scala/unit/kafka/integration/FetcherTest.scala | 4 +-
4 files changed, 25 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 3aa7b08..d1373c9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -43,13 +43,17 @@ class ConsumerFetcherManager(private val consumerIdString: String,
private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
private val lock = new ReentrantLock
private val cond = lock.newCondition()
- private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){
+ private var leaderFinderThread: ShutdownableThread = null
+
+ private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
// thread responsible for adding the fetcher to the right broker when leader is available
override def doWork() {
lock.lock()
try {
- if (noLeaderPartitionSet.isEmpty)
+ if (noLeaderPartitionSet.isEmpty) {
+ trace("No partition for leader election.")
cond.await()
+ }
try {
trace("Partitions without leader %s".format(noLeaderPartitionSet))
@@ -93,8 +97,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
Thread.sleep(config.refreshLeaderBackoffMs)
}
}
- leaderFinderThread.start()
-
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
new ConsumerFetcherThread(
@@ -103,8 +105,9 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
- if (!leaderFinderThread.isRunning.get())
- throw new RuntimeException("%s already shutdown".format(name))
+ leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
+ leaderFinderThread.start()
+
lock.lock()
try {
partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
@@ -116,16 +119,17 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
}
- def stopAllConnections() {
- lock.lock()
- // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
- noLeaderPartitionSet.clear()
- // second, clear partitionMap
- partitionMap = null
- lock.unlock()
+ def stopConnections() {
+ info("Stopping leader finder thread")
+ if (leaderFinderThread != null) {
+ leaderFinderThread.shutdown()
+ leaderFinderThread = null
+ }
- // third, stop all existing fetchers
+ info("Stopping all fetchers")
closeAllFetchers()
+
+ info("All connections stopped")
}
def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
@@ -141,11 +145,4 @@ class ConsumerFetcherManager(private val consumerIdString: String,
lock.unlock()
}
}
-
- def shutdown() {
- info("shutting down")
- leaderFinderThread.shutdown()
- stopAllConnections()
- info("shutdown completed")
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index dcbcf21..9a5fbfe 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -162,7 +162,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (config.autoCommitEnable)
scheduler.shutdownNow()
fetcher match {
- case Some(f) => f.shutdown
+ case Some(f) => f.stopConnections
case None =>
}
sendShutdownToAllQueues()
@@ -483,7 +483,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
case Some(f) =>
- f.stopAllConnections
+ f.stopConnections
clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
info("Committing all offsets after clearing the fetcher queues")
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 4ee23cd..087979f 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -159,7 +159,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
- partitionMapLock.lock()
+ partitionMapLock.lockInterruptibly()
try {
val topicPartition = TopicAndPartition(topic, partitionId)
partitionMap.put(
@@ -172,7 +172,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
def removePartition(topic: String, partitionId: Int) {
- partitionMapLock.lock()
+ partitionMapLock.lockInterruptibly()
try {
partitionMap.remove(TopicAndPartition(topic, partitionId))
} finally {
@@ -180,17 +180,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
}
- def hasPartition(topic: String, partitionId: Int): Boolean = {
- partitionMapLock.lock()
- try {
- partitionMap.get(TopicAndPartition(topic, partitionId)).isDefined
- } finally {
- partitionMapLock.unlock()
- }
- }
-
def partitionCount() = {
- partitionMapLock.lock()
+ partitionMapLock.lockInterruptibly()
try {
partitionMap.size
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 5a57bd1..c5cddea 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -58,12 +58,12 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
- fetcher.stopAllConnections()
+ fetcher.stopConnections()
fetcher.startConnections(topicInfos, cluster)
}
override def tearDown() {
- fetcher.shutdown()
+ fetcher.stopConnections()
super.tearDown
}