You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/12/15 21:23:14 UTC
kafka git commit: KAFKA-4521;
MirrorMaker should flush all messages before releasing partition
ownership during rebalance
Repository: kafka
Updated Branches:
refs/heads/trunk a9687bc0d -> 908b6d114
KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance
Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <do...@linkedin.com>
Reviewers: Jiangjie Qin <be...@gmail.com>
Closes #2241 from lindong28/KAFKA-4521
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/908b6d11
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/908b6d11
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/908b6d11
Branch: refs/heads/trunk
Commit: 908b6d1148df963d21a70aaa73a7a87571b965a9
Parents: a9687bc
Author: Dong Lin <li...@gmail.com>
Authored: Thu Dec 15 13:23:01 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Thu Dec 15 13:23:01 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/tools/MirrorMaker.scala | 50 ++++++++++++++++++--
1 file changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/908b6d11/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 2cfcb95..19a2570 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -470,11 +470,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
def maybeFlushAndCommitOffsets() {
- if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
- debug("Committing MirrorMaker state automatically.")
+ val commitRequested = mirrorMakerConsumer.commitRequested()
+ if (commitRequested || System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
+ debug("Committing MirrorMaker state.")
producer.flush()
commitOffsets(mirrorMakerConsumer)
lastOffsetCommitMs = System.currentTimeMillis()
+ if (commitRequested)
+ mirrorMakerConsumer.notifyCommit()
}
}
@@ -503,12 +506,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer {
def init()
+ def commitRequested(): Boolean
+ def notifyCommit()
+ def requestAndWaitForCommit()
def hasData : Boolean
}
private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector,
filterSpec: TopicFilter) extends MirrorMakerBaseConsumer {
private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
+ private var immediateCommitRequested: Boolean = false
override def init() {
// Creating one stream per each connector instance
@@ -518,6 +525,29 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
iter = stream.iterator()
}
+ override def requestAndWaitForCommit() {
+ this.synchronized {
+ // skip wait() if mirrorMakerConsumer has not been initialized
+ if (iter != null) {
+ immediateCommitRequested = true
+ this.wait()
+ }
+ }
+ }
+
+ override def notifyCommit() {
+ this.synchronized {
+ immediateCommitRequested = false
+ this.notifyAll()
+ }
+ }
+
+ override def commitRequested(): Boolean = {
+ this.synchronized {
+ immediateCommitRequested
+ }
+ }
+
override def hasData = iter.hasNext()
override def receive() : BaseConsumerRecord = {
@@ -570,6 +600,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
+ override def requestAndWaitForCommit() {
+ // Do nothing
+ }
+
+ override def notifyCommit() {
+ // Do nothing
+ }
+
+ override def commitRequested(): Boolean = {
+ false
+ }
+
override def hasData = true
override def receive() : BaseConsumerRecord = {
@@ -632,8 +674,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
extends ConsumerRebalanceListener {
override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
- producer.flush()
- commitOffsets(mirrorMakerConsumer)
+ // The zookeeper listener thread, which executes this method, needs to wait for MirrorMakerThread to flush data and commit offset
+ mirrorMakerConsumer.requestAndWaitForCommit()
// invoke custom consumer rebalance listener
customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership))
}