You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2013/05/23 01:26:26 UTC
git commit: KAFKA-914;
Break deadlock between initial rebalance and watcher-triggered
rebalances; reviewed by Jun Rao and Neha Narkhede
Updated Branches:
refs/heads/0.8 32cd8994b -> ffd84eb23
KAFKA-914; Break deadlock between initial rebalance and watcher-triggered rebalances; reviewed by Jun Rao and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffd84eb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffd84eb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffd84eb2
Branch: refs/heads/0.8
Commit: ffd84eb23fcbb279a63ba5d4cb72077a0c079cff
Parents: 32cd899
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed May 22 16:26:04 2013 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed May 22 16:26:04 2013 -0700
----------------------------------------------------------------------
.../kafka/consumer/ConsumerFetcherManager.scala | 28 ++++++++++++-
core/src/main/scala/kafka/tools/MirrorMaker.scala | 32 ++++++++++----
2 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 658b5c1..db104f1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -85,13 +85,32 @@ class ConsumerFetcherManager(private val consumerIdString: String,
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
noLeaderPartitionSet -= topicAndPartition
} catch {
- case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
+ case t => {
+ /*
+ * If we are shutting down (e.g., due to a rebalance) propagate this exception upward to avoid
+ * processing subsequent partitions without leader so the leader-finder-thread can exit.
+ * It is unfortunate that we depend on the following behavior and we should redesign this: as part of
+ * processing partitions, we catch the InterruptedException (thrown from addPartition's call to
+ * lockInterruptibly) when adding partitions, thereby clearing the interrupted flag. If we process
+ * more partitions, then the lockInterruptibly in addPartition will not throw an InterruptedException
+ * and we can run into the deadlock described in KAFKA-914.
+ */
+ if (!isRunning.get())
+ throw t
+ else
+ warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
+ }
}
}
shutdownIdleFetcherThreads()
} catch {
- case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
+ case t => {
+ if (!isRunning.get())
+ throw t /* See above for why we need to propagate this exception. */
+ else
+ warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
+ }
}
} finally {
lock.unlock()
@@ -122,6 +141,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
def stopConnections() {
+ /*
+ * Stop the leader finder thread first before stopping fetchers. Otherwise, if there are more partitions without
+ * leader, then the leader finder thread will process these partitions (before shutting down) and add fetchers for
+ * these partitions.
+ */
info("Stopping leader finder thread")
if (leaderFinderThread != null) {
leaderFinderThread.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 3d22dc7..2d93947 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -28,7 +28,6 @@ import collection.mutable.ListBuffer
import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
import kafka.javaapi
-
object MirrorMaker extends Logging {
def main(args: Array[String]) {
@@ -114,23 +113,33 @@ object MirrorMaker extends Logging {
else
new Blacklist(options.valueOf(blacklistOpt))
- val streams =
- connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
+ var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
+ try {
+ streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
+ } catch {
+ case t =>
+ fatal("Unable to create stream - shutting down mirror maker.")
+ connectors.foreach(_.shutdown)
+ }
val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
val consumerThreads =
- streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+ streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
val producerThreads = new ListBuffer[ProducerThread]()
+ def cleanShutdown() {
+ connectors.foreach(_.shutdown)
+ consumerThreads.foreach(_.awaitShutdown)
+ producerThreads.foreach(_.shutdown)
+ producerThreads.foreach(_.awaitShutdown)
+ info("Kafka mirror maker shutdown successfully")
+ }
+
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
- connectors.foreach(_.shutdown)
- consumerThreads.foreach(_.awaitShutdown)
- producerThreads.foreach(_.shutdown)
- producerThreads.foreach(_.awaitShutdown)
- logger.info("Kafka migration tool shutdown successfully");
+ cleanShutdown()
}
})
@@ -145,6 +154,10 @@ object MirrorMaker extends Logging {
consumerThreads.foreach(_.start)
producerThreads.foreach(_.start)
+
+ // in case the consumer threads hit a timeout/other exception
+ consumerThreads.foreach(_.awaitShutdown)
+ cleanShutdown()
}
class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
@@ -158,6 +171,7 @@ object MirrorMaker extends Logging {
this.setName(threadName)
override def run() {
+ info("Starting mirror maker thread " + threadName)
try {
for (msgAndMetadata <- stream) {
val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)