You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/27 16:00:19 UTC
kafka git commit: KAFKA-2452: Add new consumer option to mirror maker.
Repository: kafka
Updated Branches:
refs/heads/trunk 2e4aed707 -> 2fd645ac2
KAFKA-2452: Add new consumer option to mirror maker.
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Ben Stopford, Guozhang Wang
Closes #266 from becketqin/KAFKA-2452
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fd645ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fd645ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fd645ac
Branch: refs/heads/trunk
Commit: 2fd645ac2fec7cf089cb8175ee47823b67a07226
Parents: 2e4aed7
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Oct 27 07:59:52 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Oct 27 07:59:52 2015 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/BaseConsumer.scala | 12 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 554 ++++++++++++-------
2 files changed, 373 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 8b93493..52cd5fa 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -28,13 +28,15 @@ trait BaseConsumer {
def receive(): BaseConsumerRecord
def stop()
def cleanup()
+ def commit()
}
case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer
- import scala.collection.JavaConversions._
+
+import scala.collection.JavaConversions._
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
consumer.subscribe(List(topic))
@@ -58,6 +60,10 @@ class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs:
override def cleanup() {
this.consumer.close()
}
+
+ override def commit() {
+ this.consumer.commitSync()
+ }
}
class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer {
@@ -81,5 +87,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
override def cleanup() {
this.consumerConnector.shutdown()
}
+
+ override def commit() {
+ this.consumerConnector.commitOffsets
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index fbe0c83..3cf754b 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -20,22 +20,27 @@ package kafka.tools
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.regex.Pattern
import java.util.{Collections, Properties}
import com.yammer.metrics.core.Gauge
import joptsimple.OptionParser
-import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
+import kafka.client.ClientUtils
+import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.message.MessageAndMetadata
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
+import org.apache.kafka.clients.consumer.{ConsumerWakeupException, Consumer, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConversions._
-
+import scala.util.control.ControlThrowable
/**
* The mirror maker has the following architecture:
@@ -56,12 +61,11 @@ import scala.collection.JavaConversions._
*/
object MirrorMaker extends Logging with KafkaMetricsGroup {
- private var connectors: Seq[ZookeeperConsumerConnector] = null
private var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker.
- private var numDroppedMessages: AtomicInteger = new AtomicInteger(0)
+ private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
private var messageHandler: MirrorMakerMessageHandler = null
private var offsetCommitIntervalMs = 0
private var abortOnSendFailure: Boolean = true
@@ -78,120 +82,206 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def main(args: Array[String]) {
info("Starting mirror maker")
- val parser = new OptionParser
-
- val consumerConfigOpt = parser.accepts("consumer.config",
- "Embedded consumer config for consuming from the source cluster.")
- .withRequiredArg()
- .describedAs("config file")
- .ofType(classOf[String])
-
- val producerConfigOpt = parser.accepts("producer.config",
- "Embedded producer config.")
- .withRequiredArg()
- .describedAs("config file")
- .ofType(classOf[String])
-
- val numStreamsOpt = parser.accepts("num.streams",
- "Number of consumption streams.")
- .withRequiredArg()
- .describedAs("Number of threads")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
-
- val whitelistOpt = parser.accepts("whitelist",
- "Whitelist of topics to mirror.")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
-
- val blacklistOpt = parser.accepts("blacklist",
- "Blacklist of topics to mirror.")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
-
- val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
- "Offset commit interval in ms")
- .withRequiredArg()
- .describedAs("offset commit interval in millisecond")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(60000)
-
- val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
- "The consumer rebalance listener to use for mirror maker consumer.")
- .withRequiredArg()
- .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
- .ofType(classOf[String])
-
- val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
- "Arguments used by custom rebalance listener for mirror maker consumer")
- .withRequiredArg()
- .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
- .ofType(classOf[String])
-
- val messageHandlerOpt = parser.accepts("message.handler",
- "Message handler which will process every record in-between consumer and producer.")
- .withRequiredArg()
- .describedAs("A custom message handler of type MirrorMakerMessageHandler")
- .ofType(classOf[String])
-
- val messageHandlerArgsOpt = parser.accepts("message.handler.args",
- "Arguments used by custom rebalance listener for mirror maker consumer")
- .withRequiredArg()
- .describedAs("Arguments passed to message handler constructor.")
- .ofType(classOf[String])
-
- val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
- "Configure the mirror maker to exit on a failed send.")
- .withRequiredArg()
- .describedAs("Stop the entire mirror maker when a send failure occurs")
- .ofType(classOf[String])
- .defaultsTo("true")
-
- val helpOpt = parser.accepts("help", "Print this message.")
-
- if (args.length == 0)
- CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
-
-
- val options = parser.parse(args: _*)
-
- if (options.has(helpOpt)) {
- parser.printHelpOn(System.out)
- System.exit(0)
- }
+ try {
+ val parser = new OptionParser
+
+ val consumerConfigOpt = parser.accepts("consumer.config",
+ "Embedded consumer config for consuming from the source cluster.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(classOf[String])
+
+ val useNewConsumerOpt = parser.accepts("new.consumer",
+ "Use new consumer in mirror maker.")
+
+ val producerConfigOpt = parser.accepts("producer.config",
+ "Embedded producer config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(classOf[String])
+
+ val numStreamsOpt = parser.accepts("num.streams",
+ "Number of consumption streams.")
+ .withRequiredArg()
+ .describedAs("Number of threads")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+
+ val whitelistOpt = parser.accepts("whitelist",
+ "Whitelist of topics to mirror.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
+
+ val blacklistOpt = parser.accepts("blacklist",
+ "Blacklist of topics to mirror. Only old consumer supports blacklist.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
+
+ val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
+ "Offset commit interval in ms")
+ .withRequiredArg()
+ .describedAs("offset commit interval in millisecond")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(60000)
+
+ val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
+ "The consumer rebalance listener to use for mirror maker consumer.")
+ .withRequiredArg()
+ .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
+ .ofType(classOf[String])
+
+ val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
+ "Arguments used by custom rebalance listener for mirror maker consumer")
+ .withRequiredArg()
+ .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
+ .ofType(classOf[String])
+
+ val messageHandlerOpt = parser.accepts("message.handler",
+ "Message handler which will process every record in-between consumer and producer.")
+ .withRequiredArg()
+ .describedAs("A custom message handler of type MirrorMakerMessageHandler")
+ .ofType(classOf[String])
+
+ val messageHandlerArgsOpt = parser.accepts("message.handler.args",
+ "Arguments used by custom rebalance listener for mirror maker consumer")
+ .withRequiredArg()
+ .describedAs("Arguments passed to message handler constructor.")
+ .ofType(classOf[String])
+
+ val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
+ "Configure the mirror maker to exit on a failed send.")
+ .withRequiredArg()
+ .describedAs("Stop the entire mirror maker when a send failure occurs")
+ .ofType(classOf[String])
+ .defaultsTo("true")
+
+ val helpOpt = parser.accepts("help", "Print this message.")
+
+ if (args.length == 0)
+ CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
+
+
+ val options = parser.parse(args: _*)
+
+ if (options.has(helpOpt)) {
+ parser.printHelpOn(System.out)
+ System.exit(0)
+ }
- CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
- if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
- println("Exactly one of whitelist or blacklist is required.")
- System.exit(1)
- }
+ CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
+ if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
+ println("Exactly one of whitelist or blacklist is required.")
+ System.exit(1)
+ }
+
+ abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
+ offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
+ val numStreams = options.valueOf(numStreamsOpt).intValue()
- abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
- offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
- val numStreams = options.valueOf(numStreamsOpt).intValue()
+ Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
+ override def run() {
+ cleanShutdown()
+ }
+ })
+
+ // create producer
+ val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+ // Defaults to no data loss settings.
+ maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
+ maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+ maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
+ maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+ // Always set producer key and value serializer to ByteArraySerializer.
+ producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+ producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+ producer = new MirrorMakerProducer(producerProps)
+
+ val useNewConsumer = options.has(useNewConsumerOpt)
+
+ // Create consumers
+ val mirrorMakerConsumers = if (!useNewConsumer) {
+ val customRebalanceListener = {
+ val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+ if (customRebalanceListenerClass != null) {
+ val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
+ if (rebalanceListenerArgs != null) {
+ Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
+ } else {
+ Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
+ }
+ } else {
+ None
+ }
+ }
+
+ if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener]))
+ throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener")
+ createOldConsumers(
+ numStreams,
+ options.valueOf(consumerConfigOpt),
+ customRebalanceListener,
+ Option(options.valueOf(whitelistOpt)),
+ Option(options.valueOf(blacklistOpt)))
+ } else {
+ val customRebalanceListener = {
+ val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+ if (customRebalanceListenerClass != null) {
+ val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
+ if (rebalanceListenerArgs != null) {
+ Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
+ } else {
+ Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass))
+ }
+ } else {
+ None
+ }
+ }
+ if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
+ throw new IllegalArgumentException("The rebalance listener should be an instance of" +
+ "org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
+ createNewConsumers(
+ numStreams,
+ options.valueOf(consumerConfigOpt),
+ customRebalanceListener,
+ Option(options.valueOf(whitelistOpt)))
+ }
- Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
- override def run() {
- cleanShutdown()
+ // Create mirror maker threads.
+ mirrorMakerThreads = (0 until numStreams) map (i =>
+ new MirrorMakerThread(mirrorMakerConsumers(i), i))
+
+ // Create and initialize message handler
+ val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
+ val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
+ messageHandler = {
+ if (customMessageHandlerClass != null) {
+ if (messageHandlerArgs != null)
+ CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
+ else
+ CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
+ } else {
+ defaultMirrorMakerMessageHandler
+ }
}
- })
-
- // create producer
- val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
- // Defaults to no data loss settings.
- maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
- maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
- maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
- maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
- // Always set producer key and value serializer to ByteArraySerializer.
- producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- producer = new MirrorMakerProducer(producerProps)
+ } catch {
+ case ct : ControlThrowable => throw ct
+ case t : Throwable =>
+ error("Exception when starting mirror maker.", t)
+ }
+ mirrorMakerThreads.foreach(_.start())
+ mirrorMakerThreads.foreach(_.awaitShutdown())
+ }
+
+ private def createOldConsumers(numStreams: Int,
+ consumerConfigPath: String,
+ customRebalanceListener: Option[ConsumerRebalanceListener],
+ whitelist: Option[String],
+ blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
// Create consumer connector
- val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
+ val consumerConfigProps = Utils.loadProps(consumerConfigPath)
// Disable consumer auto offsets commit to prevent data loss.
maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false")
// Set the consumer timeout so we will not block for low volume pipeline. The timeout is necessary to make sure
@@ -199,65 +289,52 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
maybeSetDefaultProperty(consumerConfigProps, "consumer.timeout.ms", "10000")
// The default client id is group id, we manually set client id to groupId-index to avoid metric collision
val groupIdString = consumerConfigProps.getProperty("group.id")
- connectors = (0 until numStreams) map { i =>
+ val connectors = (0 until numStreams) map { i =>
consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
val consumerConfig = new ConsumerConfig(consumerConfigProps)
new ZookeeperConsumerConnector(consumerConfig)
}
- // Set consumer rebalance listener.
- // Custom rebalance listener will be invoked after internal listener finishes its work.
- val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
- val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
- val customRebalanceListener = {
- if (customRebalanceListenerClass != null) {
- if (rebalanceListenerArgs != null)
- Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
- else
- Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
- } else {
- None
- }
- }
- connectors.foreach {
- connector =>
- val consumerRebalanceListener = new InternalRebalanceListener(connector, customRebalanceListener)
- connector.setConsumerRebalanceListener(consumerRebalanceListener)
- }
-
// create filters
- val filterSpec = if (options.has(whitelistOpt))
- new Whitelist(options.valueOf(whitelistOpt))
+ val filterSpec = if (whitelist.isDefined)
+ new Whitelist(whitelist.get)
+ else if (blacklist.isDefined)
+ new Blacklist(blacklist.get)
else
- new Blacklist(options.valueOf(blacklistOpt))
-
- // Create mirror maker threads
- mirrorMakerThreads = (0 until numStreams) map ( i =>
- new MirrorMakerThread(connectors(i), filterSpec, i)
- )
-
- // Create and initialize message handler
- val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
- val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
- messageHandler = {
- if (customMessageHandlerClass != null) {
- if (messageHandlerArgs != null)
- CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
- else
- CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
- } else {
- defaultMirrorMakerMessageHandler
- }
+ throw new IllegalArgumentException("Either whitelist or blacklist should be defined!")
+ (0 until numStreams) map { i =>
+ val consumer = new MirrorMakerOldConsumer(connectors(i), filterSpec)
+ val consumerRebalanceListener = new InternalRebalanceListenerForOldConsumer(consumer, customRebalanceListener)
+ connectors(i).setConsumerRebalanceListener(consumerRebalanceListener)
+ consumer
}
+ }
- mirrorMakerThreads.foreach(_.start())
- mirrorMakerThreads.foreach(_.awaitShutdown())
+ def createNewConsumers(numStreams: Int,
+ consumerConfigPath: String,
+ customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
+ whitelist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
+ // Create consumer connector
+ val consumerConfigProps = Utils.loadProps(consumerConfigPath)
+ // Disable consumer auto offsets commit to prevent data loss.
+ maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
+ // Hardcode the deserializer to ByteArrayDeserializer
+ consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
+ consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
+ // The default client id is group id, we manually set client id to groupId-index to avoid metric collision
+ val groupIdString = consumerConfigProps.getProperty("group.id")
+ val consumers = (0 until numStreams) map { i =>
+ consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
+ new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps)
+ }
+ whitelist.getOrElse(throw new IllegalArgumentException("White list cannot be empty for new consumer"))
+ consumers.map(consumer => new MirrorMakerNewConsumer(consumer, customRebalanceListener, whitelist))
}
- def commitOffsets(connector: ZookeeperConsumerConnector) {
+ def commitOffsets(mirrorMakerConsumer: MirrorMakerBaseConsumer) {
if (!exitingOnSendFailure) {
trace("Committing offsets.")
- connector.commitOffsets
+ mirrorMakerConsumer.commit()
} else {
info("Exiting on send failure, skip committing offsets.")
}
@@ -285,8 +362,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue))
}
- class MirrorMakerThread(connector: ZookeeperConsumerConnector,
- filterSpec: TopicFilter,
+ class MirrorMakerThread(mirrorMakerConsumer: MirrorMakerBaseConsumer,
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-thread-" + threadId
private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
@@ -298,25 +374,24 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
override def run() {
info("Starting mirror maker thread " + threadName)
+ mirrorMakerConsumer.init()
try {
- // Creating one stream per each connector instance
- val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
- require(streams.size == 1)
- val stream = streams(0)
- val iter = stream.iterator()
-
+ // We need the two while loop to make sure when old consumer is used, even there is no message we
+ // still commit offset. When new consumer is used, this is handled by poll(timeout).
while (!exitingOnSendFailure && !shuttingDown) {
try {
- while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) {
- val data = iter.next()
- trace("Sending message with value size %d".format(data.message().size))
+ while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) {
+ val data = mirrorMakerConsumer.receive()
+ trace("Sending message with value size %d".format(data.value.length))
val records = messageHandler.handle(data)
records.foreach(producer.send)
maybeFlushAndCommitOffsets()
}
} catch {
- case e: ConsumerTimeoutException =>
+ case cte: ConsumerTimeoutException =>
trace("Caught ConsumerTimeoutException, continue iteration.")
+ case cwe: ConsumerWakeupException =>
+ trace("Caught ConsumerWakeupException, continue iteration.")
}
maybeFlushAndCommitOffsets()
}
@@ -327,9 +402,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Flushing producer.")
producer.flush()
info("Committing consumer offsets.")
- commitOffsets(connector)
+ commitOffsets(mirrorMakerConsumer)
info("Shutting down consumer connectors.")
- connector.shutdown()
+ mirrorMakerConsumer.stop()
+ mirrorMakerConsumer.cleanup()
shutdownLatch.countDown()
info("Mirror maker thread stopped")
// if it exits accidentally, stop the entire mirror maker
@@ -343,7 +419,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def maybeFlushAndCommitOffsets() {
if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
producer.flush()
- commitOffsets(connector)
+ commitOffsets(mirrorMakerConsumer)
lastOffsetCommitMs = System.currentTimeMillis()
}
}
@@ -352,6 +428,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
try {
info(threadName + " shutting down")
shuttingDown = true
+ mirrorMakerConsumer.stop()
}
catch {
case ie: InterruptedException =>
@@ -370,6 +447,113 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
+ private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer {
+ def init()
+ def hasData : Boolean
+ }
+
+ private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector,
+ filterSpec: TopicFilter) extends MirrorMakerBaseConsumer {
+ private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
+
+ override def init() {
+ // Creating one stream per each connector instance
+ val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
+ require(streams.size == 1)
+ val stream = streams(0)
+ iter = stream.iterator()
+ }
+
+ override def hasData = iter.hasNext()
+
+ override def receive() : BaseConsumerRecord = {
+ val messageAndMetadata = iter.next()
+ BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
+ }
+
+ override def stop() {
+ // Do nothing
+ }
+
+ override def cleanup() {
+ connector.shutdown()
+ }
+
+ override def commit() {
+ connector.commitOffsets
+ }
+ }
+
+ private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
+ customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
+ whitelistOpt: Option[String])
+ extends MirrorMakerBaseConsumer {
+ val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist."))
+ var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
+
+ override def init() {
+ debug("Initiating new consumer")
+ val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
+ if (whitelistOpt.isDefined)
+ consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener)
+ }
+
+ // New consumer always hasNext
+ override def hasData = true
+
+ override def receive() : BaseConsumerRecord = {
+ while (recordIter == null || !recordIter.hasNext)
+ recordIter = consumer.poll(1000).iterator
+
+ val record = recordIter.next()
+ BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
+ }
+
+ override def stop() {
+ consumer.wakeup()
+ }
+
+ override def cleanup() {
+ ClientUtils.swallow(consumer.close())
+ }
+
+ override def commit() {
+ consumer.commitSync()
+ }
+ }
+
+ private class InternalRebalanceListenerForNewConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
+ customRebalanceListenerForNewConsumer: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])
+ extends org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
+
+ override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
+ producer.flush()
+ commitOffsets(mirrorMakerConsumer)
+ customRebalanceListenerForNewConsumer.foreach(_.onPartitionsAssigned(partitions))
+ }
+
+ override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
+ customRebalanceListenerForNewConsumer.foreach(_.onPartitionsAssigned(partitions))
+ }
+ }
+
+ private class InternalRebalanceListenerForOldConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
+ customRebalanceListenerForOldConsumer: Option[ConsumerRebalanceListener])
+ extends ConsumerRebalanceListener {
+
+ override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
+ producer.flush()
+ commitOffsets(mirrorMakerConsumer)
+ // invoke custom consumer rebalance listener
+ customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership))
+ }
+
+ override def beforeStartingFetchers(consumerId: String,
+ partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
+ customRebalanceListenerForOldConsumer.foreach(_.beforeStartingFetchers(consumerId, partitionAssignment))
+ }
+ }
+
private class MirrorMakerProducer(val producerProps: Properties) {
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
@@ -417,36 +601,22 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
- private class InternalRebalanceListener(connector: ZookeeperConsumerConnector,
- customRebalanceListener: Option[ConsumerRebalanceListener])
- extends ConsumerRebalanceListener {
-
- override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
- producer.flush()
- commitOffsets(connector)
- // invoke custom consumer rebalance listener
- if (customRebalanceListener.isDefined)
- customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership)
- }
-
- override def beforeStartingFetchers(consumerId: String,
- partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
- if (customRebalanceListener.isDefined)
- customRebalanceListener.get.beforeStartingFetchers(consumerId, partitionAssignment)
- }
- }
-
/**
* If message.handler.args is specified. A constructor that takes in a String as argument must exist.
*/
trait MirrorMakerMessageHandler {
def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
+ def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}
private object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
override def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key(), record.message()))
}
+
+ override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
+ Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key, record.value))
+ }
}
}