You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/27 16:00:19 UTC

kafka git commit: KAFKA-2452: Add new consumer option to mirror maker.

Repository: kafka
Updated Branches:
  refs/heads/trunk 2e4aed707 -> 2fd645ac2


KAFKA-2452: Add new consumer option to mirror maker.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Ben Stopford, Guozhang Wang

Closes #266 from becketqin/KAFKA-2452


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fd645ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fd645ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fd645ac

Branch: refs/heads/trunk
Commit: 2fd645ac2fec7cf089cb8175ee47823b67a07226
Parents: 2e4aed7
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Oct 27 07:59:52 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Oct 27 07:59:52 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     |  12 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 554 ++++++++++++-------
 2 files changed, 373 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 8b93493..52cd5fa 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -28,13 +28,15 @@ trait BaseConsumer {
   def receive(): BaseConsumerRecord
   def stop()
   def cleanup()
+  def commit()
 }
 
 case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
 
 class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
-  import scala.collection.JavaConversions._
+
+import scala.collection.JavaConversions._
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
   consumer.subscribe(List(topic))
@@ -58,6 +60,10 @@ class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs:
   override def cleanup() {
     this.consumer.close()
   }
+
+  override def commit() {
+    this.consumer.commitSync()
+  }
 }
 
 class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer {
@@ -81,5 +87,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
   override def cleanup() {
     this.consumerConnector.shutdown()
   }
+
+  override def commit() {
+    this.consumerConnector.commitOffsets
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index fbe0c83..3cf754b 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -20,22 +20,27 @@ package kafka.tools
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.regex.Pattern
 import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
+import kafka.client.ClientUtils
+import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
 import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
+import org.apache.kafka.clients.consumer.{ConsumerWakeupException, Consumer, ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConversions._
-
+import scala.util.control.ControlThrowable
 
 /**
  * The mirror maker has the following architecture:
@@ -56,12 +61,11 @@ import scala.collection.JavaConversions._
  */
 object MirrorMaker extends Logging with KafkaMetricsGroup {
 
-  private var connectors: Seq[ZookeeperConsumerConnector] = null
   private var producer: MirrorMakerProducer = null
   private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
   // Track the messages not successfully sent by mirror maker.
-  private var numDroppedMessages: AtomicInteger = new AtomicInteger(0)
+  private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
   private var messageHandler: MirrorMakerMessageHandler = null
   private var offsetCommitIntervalMs = 0
   private var abortOnSendFailure: Boolean = true
@@ -78,120 +82,206 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   def main(args: Array[String]) {
 
     info("Starting mirror maker")
-    val parser = new OptionParser
-
-    val consumerConfigOpt = parser.accepts("consumer.config",
-      "Embedded consumer config for consuming from the source cluster.")
-      .withRequiredArg()
-      .describedAs("config file")
-      .ofType(classOf[String])
-
-    val producerConfigOpt = parser.accepts("producer.config",
-      "Embedded producer config.")
-      .withRequiredArg()
-      .describedAs("config file")
-      .ofType(classOf[String])
-
-    val numStreamsOpt = parser.accepts("num.streams",
-      "Number of consumption streams.")
-      .withRequiredArg()
-      .describedAs("Number of threads")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-
-    val whitelistOpt = parser.accepts("whitelist",
-      "Whitelist of topics to mirror.")
-      .withRequiredArg()
-      .describedAs("Java regex (String)")
-      .ofType(classOf[String])
-
-    val blacklistOpt = parser.accepts("blacklist",
-      "Blacklist of topics to mirror.")
-      .withRequiredArg()
-      .describedAs("Java regex (String)")
-      .ofType(classOf[String])
-
-    val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
-      "Offset commit interval in ms")
-      .withRequiredArg()
-      .describedAs("offset commit interval in millisecond")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(60000)
-
-    val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
-      "The consumer rebalance listener to use for mirror maker consumer.")
-      .withRequiredArg()
-      .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
-      .ofType(classOf[String])
-
-    val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
-      "Arguments used by custom rebalance listener for mirror maker consumer")
-      .withRequiredArg()
-      .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
-      .ofType(classOf[String])
-
-    val messageHandlerOpt = parser.accepts("message.handler",
-      "Message handler which will process every record in-between consumer and producer.")
-      .withRequiredArg()
-      .describedAs("A custom message handler of type MirrorMakerMessageHandler")
-      .ofType(classOf[String])
-
-    val messageHandlerArgsOpt = parser.accepts("message.handler.args",
-      "Arguments used by custom rebalance listener for mirror maker consumer")
-      .withRequiredArg()
-      .describedAs("Arguments passed to message handler constructor.")
-      .ofType(classOf[String])
-
-    val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
-      "Configure the mirror maker to exit on a failed send.")
-      .withRequiredArg()
-      .describedAs("Stop the entire mirror maker when a send failure occurs")
-      .ofType(classOf[String])
-      .defaultsTo("true")
-
-    val helpOpt = parser.accepts("help", "Print this message.")
-
-    if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
-
-
-    val options = parser.parse(args: _*)
-
-    if (options.has(helpOpt)) {
-      parser.printHelpOn(System.out)
-      System.exit(0)
-    }
+    try {
+      val parser = new OptionParser
+
+      val consumerConfigOpt = parser.accepts("consumer.config",
+        "Embedded consumer config for consuming from the source cluster.")
+        .withRequiredArg()
+        .describedAs("config file")
+        .ofType(classOf[String])
+
+      val useNewConsumerOpt = parser.accepts("new.consumer",
+        "Use new consumer in mirror maker.")
+
+      val producerConfigOpt = parser.accepts("producer.config",
+        "Embedded producer config.")
+        .withRequiredArg()
+        .describedAs("config file")
+        .ofType(classOf[String])
+
+      val numStreamsOpt = parser.accepts("num.streams",
+        "Number of consumption streams.")
+        .withRequiredArg()
+        .describedAs("Number of threads")
+        .ofType(classOf[java.lang.Integer])
+        .defaultsTo(1)
+
+      val whitelistOpt = parser.accepts("whitelist",
+        "Whitelist of topics to mirror.")
+        .withRequiredArg()
+        .describedAs("Java regex (String)")
+        .ofType(classOf[String])
+
+      val blacklistOpt = parser.accepts("blacklist",
+        "Blacklist of topics to mirror. Only old consumer supports blacklist.")
+        .withRequiredArg()
+        .describedAs("Java regex (String)")
+        .ofType(classOf[String])
+
+      val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
+        "Offset commit interval in ms")
+        .withRequiredArg()
+        .describedAs("offset commit interval in millisecond")
+        .ofType(classOf[java.lang.Integer])
+        .defaultsTo(60000)
+
+      val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
+        "The consumer rebalance listener to use for mirror maker consumer.")
+        .withRequiredArg()
+        .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
+        .ofType(classOf[String])
+
+      val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
+        "Arguments used by custom rebalance listener for mirror maker consumer")
+        .withRequiredArg()
+        .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
+        .ofType(classOf[String])
+
+      val messageHandlerOpt = parser.accepts("message.handler",
+        "Message handler which will process every record in-between consumer and producer.")
+        .withRequiredArg()
+        .describedAs("A custom message handler of type MirrorMakerMessageHandler")
+        .ofType(classOf[String])
+
+      val messageHandlerArgsOpt = parser.accepts("message.handler.args",
+        "Arguments used by custom rebalance listener for mirror maker consumer")
+        .withRequiredArg()
+        .describedAs("Arguments passed to message handler constructor.")
+        .ofType(classOf[String])
+
+      val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
+        "Configure the mirror maker to exit on a failed send.")
+        .withRequiredArg()
+        .describedAs("Stop the entire mirror maker when a send failure occurs")
+        .ofType(classOf[String])
+        .defaultsTo("true")
+
+      val helpOpt = parser.accepts("help", "Print this message.")
+
+      if (args.length == 0)
+        CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
+
+
+      val options = parser.parse(args: _*)
+
+      if (options.has(helpOpt)) {
+        parser.printHelpOn(System.out)
+        System.exit(0)
+      }
 
-    CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
-    if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
-      println("Exactly one of whitelist or blacklist is required.")
-      System.exit(1)
-    }
+      CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
+      if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
+        println("Exactly one of whitelist or blacklist is required.")
+        System.exit(1)
+      }
+
+      abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
+      offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
+      val numStreams = options.valueOf(numStreamsOpt).intValue()
 
-    abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
-    offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
-    val numStreams = options.valueOf(numStreamsOpt).intValue()
+      Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
+        override def run() {
+          cleanShutdown()
+        }
+      })
+
+      // create producer
+      val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+      // Defaults to no data loss settings.
+      maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
+      maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+      maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
+      maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+      // Always set producer key and value serializer to ByteArraySerializer.
+      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+      producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+      producer = new MirrorMakerProducer(producerProps)
+
+      val useNewConsumer = options.has(useNewConsumerOpt)
+
+      // Create consumers
+      val mirrorMakerConsumers = if (!useNewConsumer) {
+        val customRebalanceListener = {
+          val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+          if (customRebalanceListenerClass != null) {
+            val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
+            if (rebalanceListenerArgs != null) {
+              Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
+            } else {
+              Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
+            }
+          } else {
+            None
+          }
+        }
+
+        if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener]))
+          throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener")
+        createOldConsumers(
+          numStreams,
+          options.valueOf(consumerConfigOpt),
+          customRebalanceListener,
+          Option(options.valueOf(whitelistOpt)),
+          Option(options.valueOf(blacklistOpt)))
+      } else {
+        val customRebalanceListener = {
+          val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+          if (customRebalanceListenerClass != null) {
+            val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
+            if (rebalanceListenerArgs != null) {
+                Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
+            } else {
+                Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass))
+            }
+          } else {
+            None
+          }
+        }
+        if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
+          throw new IllegalArgumentException("The rebalance listener should be an instance of" +
+            "org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
+        createNewConsumers(
+          numStreams,
+          options.valueOf(consumerConfigOpt),
+          customRebalanceListener,
+          Option(options.valueOf(whitelistOpt)))
+      }
 
-    Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
-      override def run() {
-        cleanShutdown()
+      // Create mirror maker threads.
+      mirrorMakerThreads = (0 until numStreams) map (i =>
+        new MirrorMakerThread(mirrorMakerConsumers(i), i))
+
+      // Create and initialize message handler
+      val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
+      val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
+      messageHandler = {
+        if (customMessageHandlerClass != null) {
+          if (messageHandlerArgs != null)
+            CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
+          else
+            CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
+        } else {
+          defaultMirrorMakerMessageHandler
+        }
       }
-    })
-    
-    // create producer
-    val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
-    // Defaults to no data loss settings.
-    maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
-    maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
-    maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
-    maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-    // Always set producer key and value serializer to ByteArraySerializer.
-    producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    producer = new MirrorMakerProducer(producerProps)
+    } catch {
+      case ct : ControlThrowable => throw ct
+      case t : Throwable =>
+        error("Exception when starting mirror maker.", t)
+    }
 
+    mirrorMakerThreads.foreach(_.start())
+    mirrorMakerThreads.foreach(_.awaitShutdown())
+  }
+
+  private def createOldConsumers(numStreams: Int,
+                                consumerConfigPath: String,
+                                customRebalanceListener: Option[ConsumerRebalanceListener],
+                                whitelist: Option[String],
+                                blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
     // Create consumer connector
-    val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
+    val consumerConfigProps = Utils.loadProps(consumerConfigPath)
     // Disable consumer auto offsets commit to prevent data loss.
     maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false")
     // Set the consumer timeout so we will not block for low volume pipeline. The timeout is necessary to make sure
@@ -199,65 +289,52 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     maybeSetDefaultProperty(consumerConfigProps, "consumer.timeout.ms", "10000")
     // The default client id is group id, we manually set client id to groupId-index to avoid metric collision
     val groupIdString = consumerConfigProps.getProperty("group.id")
-    connectors = (0 until numStreams) map { i =>
+    val connectors = (0 until numStreams) map { i =>
       consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
       val consumerConfig = new ConsumerConfig(consumerConfigProps)
       new ZookeeperConsumerConnector(consumerConfig)
     }
 
-    // Set consumer rebalance listener.
-    // Custom rebalance listener will be invoked after internal listener finishes its work.
-    val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
-    val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
-    val customRebalanceListener = {
-      if (customRebalanceListenerClass != null) {
-        if (rebalanceListenerArgs != null)
-          Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
-        else
-          Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
-      } else {
-        None
-      }
-    }
-    connectors.foreach {
-      connector =>
-        val consumerRebalanceListener = new InternalRebalanceListener(connector, customRebalanceListener)
-        connector.setConsumerRebalanceListener(consumerRebalanceListener)
-    }
-
     // create filters
-    val filterSpec = if (options.has(whitelistOpt))
-      new Whitelist(options.valueOf(whitelistOpt))
+    val filterSpec = if (whitelist.isDefined)
+      new Whitelist(whitelist.get)
+    else if (blacklist.isDefined)
+      new Blacklist(blacklist.get)
     else
-      new Blacklist(options.valueOf(blacklistOpt))
-
-    // Create mirror maker threads
-    mirrorMakerThreads = (0 until numStreams) map ( i =>
-        new MirrorMakerThread(connectors(i), filterSpec, i)
-    )
-
-    // Create and initialize message handler
-    val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
-    val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
-    messageHandler = {
-      if (customMessageHandlerClass != null) {
-        if (messageHandlerArgs != null)
-          CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
-        else
-          CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
-      } else {
-        defaultMirrorMakerMessageHandler
-      }
+      throw new IllegalArgumentException("Either whitelist or blacklist should be defined!")
+    (0 until numStreams) map { i =>
+      val consumer = new MirrorMakerOldConsumer(connectors(i), filterSpec)
+      val consumerRebalanceListener = new InternalRebalanceListenerForOldConsumer(consumer, customRebalanceListener)
+      connectors(i).setConsumerRebalanceListener(consumerRebalanceListener)
+      consumer
     }
+  }
 
-    mirrorMakerThreads.foreach(_.start())
-    mirrorMakerThreads.foreach(_.awaitShutdown())
+  def createNewConsumers(numStreams: Int,
+                         consumerConfigPath: String,
+                         customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
+                         whitelist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
+    // Create consumer connector
+    val consumerConfigProps = Utils.loadProps(consumerConfigPath)
+    // Disable consumer auto offsets commit to prevent data loss.
+    maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
+    // Hardcode the deserializer to ByteArrayDeserializer
+    consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
+    consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
+    // The default client id is group id, we manually set client id to groupId-index to avoid metric collision
+    val groupIdString = consumerConfigProps.getProperty("group.id")
+    val consumers = (0 until numStreams) map { i =>
+      consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
+      new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps)
+    }
+    whitelist.getOrElse(throw new IllegalArgumentException("White list cannot be empty for new consumer"))
+    consumers.map(consumer => new MirrorMakerNewConsumer(consumer, customRebalanceListener, whitelist))
   }
 
-  def commitOffsets(connector: ZookeeperConsumerConnector) {
+  def commitOffsets(mirrorMakerConsumer: MirrorMakerBaseConsumer) {
     if (!exitingOnSendFailure) {
       trace("Committing offsets.")
-      connector.commitOffsets
+      mirrorMakerConsumer.commit()
     } else {
       info("Exiting on send failure, skip committing offsets.")
     }
@@ -285,8 +362,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue))
   }
 
-  class MirrorMakerThread(connector: ZookeeperConsumerConnector,
-                          filterSpec: TopicFilter,
+  class MirrorMakerThread(mirrorMakerConsumer: MirrorMakerBaseConsumer,
                           val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
     private val threadName = "mirrormaker-thread-" + threadId
     private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
@@ -298,25 +374,24 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     override def run() {
       info("Starting mirror maker thread " + threadName)
+      mirrorMakerConsumer.init()
       try {
-        // Creating one stream per each connector instance
-        val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
-        require(streams.size == 1)
-        val stream = streams(0)
-        val iter = stream.iterator()
-
+        // We need the two while loop to make sure when old consumer is used, even there is no message we
+        // still commit offset. When new consumer is used, this is handled by poll(timeout).
         while (!exitingOnSendFailure && !shuttingDown) {
           try {
-            while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) {
-              val data = iter.next()
-              trace("Sending message with value size %d".format(data.message().size))
+            while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) {
+              val data = mirrorMakerConsumer.receive()
+              trace("Sending message with value size %d".format(data.value.length))
               val records = messageHandler.handle(data)
               records.foreach(producer.send)
               maybeFlushAndCommitOffsets()
             }
           } catch {
-            case e: ConsumerTimeoutException =>
+            case cte: ConsumerTimeoutException =>
               trace("Caught ConsumerTimeoutException, continue iteration.")
+            case cwe: ConsumerWakeupException =>
+              trace("Caught ConsumerWakeupException, continue iteration.")
           }
           maybeFlushAndCommitOffsets()
         }
@@ -327,9 +402,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         info("Flushing producer.")
         producer.flush()
         info("Committing consumer offsets.")
-        commitOffsets(connector)
+        commitOffsets(mirrorMakerConsumer)
         info("Shutting down consumer connectors.")
-        connector.shutdown()
+        mirrorMakerConsumer.stop()
+        mirrorMakerConsumer.cleanup()
         shutdownLatch.countDown()
         info("Mirror maker thread stopped")
         // if it exits accidentally, stop the entire mirror maker
@@ -343,7 +419,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     def maybeFlushAndCommitOffsets() {
       if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
         producer.flush()
-        commitOffsets(connector)
+        commitOffsets(mirrorMakerConsumer)
         lastOffsetCommitMs = System.currentTimeMillis()
       }
     }
@@ -352,6 +428,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       try {
         info(threadName + " shutting down")
         shuttingDown = true
+        mirrorMakerConsumer.stop()
       }
       catch {
         case ie: InterruptedException =>
@@ -370,6 +447,113 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
   }
 
+  private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer {
+    def init()
+    def hasData : Boolean
+  }
+
+  private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector,
+                                       filterSpec: TopicFilter) extends MirrorMakerBaseConsumer {
+    private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
+
+    override def init() {
+      // Creating one stream per each connector instance
+      val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
+      require(streams.size == 1)
+      val stream = streams(0)
+      iter = stream.iterator()
+    }
+
+    override def hasData = iter.hasNext()
+
+    override def receive() : BaseConsumerRecord = {
+      val messageAndMetadata = iter.next()
+      BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
+    }
+
+    override def stop() {
+      // Do nothing
+    }
+
+    override def cleanup() {
+      connector.shutdown()
+    }
+
+    override def commit() {
+      connector.commitOffsets
+    }
+  }
+
+  private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
+                                       customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
+                                       whitelistOpt: Option[String])
+    extends MirrorMakerBaseConsumer {
+    val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist."))
+    var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
+
+    override def init() {
+      debug("Initiating new consumer")
+      val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
+      if (whitelistOpt.isDefined)
+        consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener)
+    }
+
+    // New consumer always hasNext
+    override def hasData = true
+
+    override def receive() : BaseConsumerRecord = {
+      while (recordIter == null || !recordIter.hasNext)
+        recordIter = consumer.poll(1000).iterator
+
+      val record = recordIter.next()
+      BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
+    }
+
+    override def stop() {
+      consumer.wakeup()
+    }
+
+    override def cleanup() {
+      ClientUtils.swallow(consumer.close())
+    }
+
+    override def commit() {
+      consumer.commitSync()
+    }
+  }
+
+  private class InternalRebalanceListenerForNewConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
+                                                        customRebalanceListenerForNewConsumer: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])
+    extends org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
+
+    override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
+      producer.flush()
+      commitOffsets(mirrorMakerConsumer)
+      customRebalanceListenerForNewConsumer.foreach(_.onPartitionsAssigned(partitions))
+    }
+
+    override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
+      customRebalanceListenerForNewConsumer.foreach(_.onPartitionsAssigned(partitions))
+    }
+  }
+
+  private class InternalRebalanceListenerForOldConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
+                                                        customRebalanceListenerForOldConsumer: Option[ConsumerRebalanceListener])
+    extends ConsumerRebalanceListener {
+
+    override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
+      producer.flush()
+      commitOffsets(mirrorMakerConsumer)
+      // invoke custom consumer rebalance listener
+      customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership))
+    }
+
+    override def beforeStartingFetchers(consumerId: String,
+                                        partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
+      customRebalanceListenerForOldConsumer.foreach(_.beforeStartingFetchers(consumerId, partitionAssignment))
+    }
+  }
+
   private class MirrorMakerProducer(val producerProps: Properties) {
 
     val sync = producerProps.getProperty("producer.type", "async").equals("sync")
@@ -417,36 +601,22 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
   }
 
-  private class InternalRebalanceListener(connector: ZookeeperConsumerConnector,
-                                          customRebalanceListener: Option[ConsumerRebalanceListener])
-    extends ConsumerRebalanceListener {
-
-    override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
-      producer.flush()
-      commitOffsets(connector)
-      // invoke custom consumer rebalance listener
-      if (customRebalanceListener.isDefined)
-        customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership)
-    }
-
-    override def beforeStartingFetchers(consumerId: String,
-                                        partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
-      if (customRebalanceListener.isDefined)
-        customRebalanceListener.get.beforeStartingFetchers(consumerId, partitionAssignment)
-    }
-  }
-
   /**
    * If message.handler.args is specified. A constructor that takes in a String as argument must exist.
    */
   trait MirrorMakerMessageHandler {
     def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
+    def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
   }
 
   private object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
     override def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
       Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key(), record.message()))
     }
+
+    override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
+      Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key, record.value))
+    }
   }
 
 }