You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/03 22:45:27 UTC

git commit: KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 3c27988ca -> bd262ac70


KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd262ac7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd262ac7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd262ac7

Branch: refs/heads/0.8
Commit: bd262ac708062e502406e8d775f4c9432a5364e7
Parents: 3c27988
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Apr 3 13:43:50 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Apr 3 13:43:50 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/KafkaMigrationTool.java |    5 ++++-
 1 files changed, 4 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd262ac7/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index a15b350..3c18286 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -385,8 +385,10 @@ public class KafkaMigrationTool {
       try{
         while(true) {
           KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
-          if(!data.equals(shutdownMessage))
+          if(!data.equals(shutdownMessage)) {
             producer.send(data);
+            if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
+          }
           else
             break;
         }
@@ -410,6 +412,7 @@ public class KafkaMigrationTool {
     public void awaitShutdown() {
       try {
         shutdownComplete.await();
+        producer.close();
         logger.info("Producer thread " + threadName + " shutdown complete");
       } catch(InterruptedException ie) {
         logger.warn("Interrupt during shutdown of ProducerThread", ie);