You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/03 22:45:27 UTC
git commit: KAFKA-842 Mirror maker can lose some messages during
shutdown; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 3c27988ca -> bd262ac70
KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd262ac7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd262ac7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd262ac7
Branch: refs/heads/0.8
Commit: bd262ac708062e502406e8d775f4c9432a5364e7
Parents: 3c27988
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Apr 3 13:43:50 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Apr 3 13:43:50 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/tools/KafkaMigrationTool.java | 5 ++++-
1 files changed, 4 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd262ac7/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index a15b350..3c18286 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -385,8 +385,10 @@ public class KafkaMigrationTool {
try{
while(true) {
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
- if(!data.equals(shutdownMessage))
+ if(!data.equals(shutdownMessage)) {
producer.send(data);
+ if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
+ }
else
break;
}
@@ -410,6 +412,7 @@ public class KafkaMigrationTool {
public void awaitShutdown() {
try {
shutdownComplete.await();
+ producer.close();
logger.info("Producer thread " + threadName + " shutdown complete");
} catch(InterruptedException ie) {
logger.warn("Interrupt during shutdown of ProducerThread", ie);