You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/12/15 21:23:14 UTC

kafka git commit: KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance

Repository: kafka
Updated Branches:
  refs/heads/trunk a9687bc0d -> 908b6d114


KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance

Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <do...@linkedin.com>

Reviewers: Jiangjie Qin <be...@gmail.com>

Closes #2241 from lindong28/KAFKA-4521


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/908b6d11
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/908b6d11
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/908b6d11

Branch: refs/heads/trunk
Commit: 908b6d1148df963d21a70aaa73a7a87571b965a9
Parents: a9687bc
Author: Dong Lin <li...@gmail.com>
Authored: Thu Dec 15 13:23:01 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Thu Dec 15 13:23:01 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 50 ++++++++++++++++++--
 1 file changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/908b6d11/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 2cfcb95..19a2570 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -470,11 +470,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     def maybeFlushAndCommitOffsets() {
-      if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
-        debug("Committing MirrorMaker state automatically.")
+      val commitRequested = mirrorMakerConsumer.commitRequested()
+      if (commitRequested || System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
+        debug("Committing MirrorMaker state.")
         producer.flush()
         commitOffsets(mirrorMakerConsumer)
         lastOffsetCommitMs = System.currentTimeMillis()
+        if (commitRequested)
+          mirrorMakerConsumer.notifyCommit()
       }
     }
 
@@ -503,12 +506,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer {
     def init()
+    def commitRequested(): Boolean
+    def notifyCommit()
+    def requestAndWaitForCommit()
     def hasData : Boolean
   }
 
   private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector,
                                        filterSpec: TopicFilter) extends MirrorMakerBaseConsumer {
     private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
+    private var immediateCommitRequested: Boolean = false
 
     override def init() {
       // Creating one stream per each connector instance
@@ -518,6 +525,29 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       iter = stream.iterator()
     }
 
+    override def requestAndWaitForCommit() {
+      this.synchronized {
+        // skip wait() if mirrorMakerConsumer has not been initialized
+        if (iter != null) {
+          immediateCommitRequested = true
+          this.wait()
+        }
+      }
+    }
+
+    override def notifyCommit() {
+      this.synchronized {
+        immediateCommitRequested = false
+        this.notifyAll()
+      }
+    }
+
+    override def commitRequested(): Boolean = {
+      this.synchronized {
+        immediateCommitRequested
+      }
+    }
+
     override def hasData = iter.hasNext()
 
     override def receive() : BaseConsumerRecord = {
@@ -570,6 +600,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       }
     }
 
+    override def requestAndWaitForCommit() {
+      // Do nothing
+    }
+
+    override def notifyCommit() {
+      // Do nothing
+    }
+    
+    override def commitRequested(): Boolean = {
+      false
+    }
+
     override def hasData = true
 
     override def receive() : BaseConsumerRecord = {
@@ -632,8 +674,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     extends ConsumerRebalanceListener {
 
     override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
-      producer.flush()
-      commitOffsets(mirrorMakerConsumer)
+      // The zookeeper listener thread, which executes this method, needs to wait for MirrorMakerThread to flush data and commit offset
+      mirrorMakerConsumer.requestAndWaitForCommit()
       // invoke custom consumer rebalance listener
       customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership))
     }