You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/18 22:58:33 UTC

kafka git commit: KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs right

Repository: kafka
Updated Branches:
  refs/heads/trunk 5bbbb7fdd -> 82789e751


KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs right


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82789e75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82789e75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82789e75

Branch: refs/heads/trunk
Commit: 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
Parents: 5bbbb7f
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Mar 18 14:58:11 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 18 14:58:11 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/82789e75/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 11acc31..4f3c4c8 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -33,7 +33,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
 import kafka.utils.{CommandLineUtils, Logging, Utils}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata}
 
 /**
  * The mirror maker has the following architecture:
@@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM
  *            acks=all
  *            retries=max integer
  *            block.on.buffer.full=true
+ *            max.in.flight.requests.per.connection=1
  *       2. Consumer Settings
  *            auto.commit.enable=false
  *       3. Mirror Maker Setting:
@@ -57,7 +58,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   private var producer: MirrorMakerProducer = null
   private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
-  // Track the messages unacked for consumer rebalance
+  // Track the messages not successfully sent by mirror maker.
   private var numDroppedMessages: AtomicInteger = new AtomicInteger(0)
   private var messageHandler: MirrorMakerMessageHandler = null
   private var offsetCommitIntervalMs = 0
@@ -83,7 +84,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       .describedAs("config file")
       .ofType(classOf[String])
 
-    // Please see note about MaxInflightRequests
     val producerConfigOpt = parser.accepts("producer.config",
       "Embedded producer config.")
       .withRequiredArg()
@@ -179,9 +179,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     // create producer
     val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
     // Defaults to no data loss settings.
-    maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString)
-    maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true")
-    maybeSetDefaultProperty(producerProps, "acks", "all")
+    maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
+    maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+    maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
+    maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
     producer = new MirrorMakerProducer(producerProps)
 
     // Create consumer connector
@@ -253,7 +254,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     messageHandler = {
       if (customMessageHandlerClass != null) {
         if (messageHandlerArgs != null)
-          Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs)
+          Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
         else
           Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
       } else {
@@ -409,11 +410,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
         if (abortOnSendFailure)
           exitingOnSendFailure = true
+        numDroppedMessages.incrementAndGet()
       }
     }
   }
 
-
   private class InternalRebalanceListener(connector: ZookeeperConsumerConnector,
                                           customRebalanceListener: Option[ConsumerRebalanceListener])
     extends ConsumerRebalanceListener {