You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/04/25 04:24:47 UTC
svn commit: r1330083 - in /incubator/kafka/trunk:
core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/tools/
core/src/main/scala/kafka/utils/ core/src/test/scala/unit/kafka/utils/
system_test/mirror_m...
Author: junrao
Date: Wed Apr 25 02:24:47 2012
New Revision: 1330083
URL: http://svn.apache.org/viewvc?rev=1330083&view=rev
Log:
Mirroring should use multiple producers; add producer retries to DefaultEventHandler; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-332
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Apr 25 02:24:47 2012
@@ -32,6 +32,17 @@ class ProducerConfig(val props: Properti
if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null)
throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
+ /**
+ * If DefaultEventHandler is used, this specifies the number of times to
+ * retry if an error is encountered during send. Currently, it is only
+ * appropriate when broker.list points to a VIP. If the zk.connect option
+ * is used instead, this will not have any effect because with the zk-based
+ * producer, brokers are not re-selected upon retry. So retries would go to
+ * the same (potentially still down) broker. (KAFKA-253 will help address
+ * this.)
+ */
+ val numRetries = Utils.getInt(props, "num.retries", 0)
+
/** If both broker.list and zk.connect options are specified, throw an exception */
if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect))
throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Apr 25 02:24:47 2012
@@ -47,9 +47,25 @@ private[kafka] class DefaultEventHandler
private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
if(messagesPerTopic.size > 0) {
val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
- syncProducer.multiSend(requests)
- trace("kafka producer sent messages for topics %s to broker %s:%d"
- .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
+
+ val maxAttempts = config.numRetries + 1
+ var attemptsRemaining = maxAttempts
+ var sent = false
+
+ while (attemptsRemaining > 0 && !sent) {
+ attemptsRemaining -= 1
+ try {
+ syncProducer.multiSend(requests)
+ trace("kafka producer sent messages for topics %s to broker %s:%d (on attempt %d)"
+ .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port, maxAttempts - attemptsRemaining))
+ sent = true
+ }
+ catch {
+ case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining))
+ if (attemptsRemaining == 0)
+ throw e
+ }
+ }
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Apr 25 02:24:47 2012
@@ -115,7 +115,7 @@ private[async] class ProducerSendThread[
if(events.size > 0)
handler.handle(events, underlyingProducer, serializer)
}catch {
- case e: Exception => error("Error in handling batch of " + events.size + " events", e)
+ case e => error("Error in handling batch of " + events.size + " events", e)
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala Wed Apr 25 02:24:47 2012
@@ -33,20 +33,27 @@ object MirrorMaker extends Logging {
info ("Starting mirror maker")
val parser = new OptionParser
- val consumerConfigOpt = parser.accepts("consumer-config",
+ val consumerConfigOpt = parser.accepts("consumer.config",
"Consumer config to consume from a source cluster. " +
"You may specify multiple of these.")
.withRequiredArg()
.describedAs("config file")
.ofType(classOf[String])
- val producerConfigOpt = parser.accepts("producer-config",
+ val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
.withRequiredArg()
.describedAs("config file")
.ofType(classOf[String])
+
+ val numProducersOpt = parser.accepts("num.producers",
+ "Number of producer instances")
+ .withRequiredArg()
+ .describedAs("Number of producers")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
- val numStreamsOpt = parser.accepts("num-streams",
+ val numStreamsOpt = parser.accepts("num.streams",
"Number of consumption streams.")
.withRequiredArg()
.describedAs("Number of threads")
@@ -83,11 +90,11 @@ object MirrorMaker extends Logging {
val numStreams = options.valueOf(numStreamsOpt)
- val producer = {
+ val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
val config = new ProducerConfig(
Utils.loadProps(options.valueOf(producerConfigOpt)))
new Producer[Null, Message](config)
- }
+ })
val threads = {
val connectors = options.valuesOf(consumerConfigOpt).toList
@@ -97,7 +104,7 @@ object MirrorMaker extends Logging {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
connectors.foreach(_.shutdown())
- producer.close()
+ producers.foreach(_.close())
}
})
@@ -110,7 +117,7 @@ object MirrorMaker extends Logging {
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
streams.flatten.zipWithIndex.map(streamAndIndex => {
- new MirrorMakerThread(streamAndIndex._1, producer, streamAndIndex._2)
+ new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
})
}
@@ -120,18 +127,20 @@ object MirrorMaker extends Logging {
}
class MirrorMakerThread(stream: KafkaStream[Message],
- producer: Producer[Null, Message],
+ producers: Seq[Producer[Null, Message]],
threadId: Int)
extends Thread with Logging {
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-" + threadId
+ private val producerSelector = Utils.circularIterator(producers)
this.setName(threadName)
override def run() {
try {
for (msgAndMetadata <- stream) {
+ val producer = producerSelector.next()
val pd = new ProducerData[Null, Message](
msgAndMetadata.topic, msgAndMetadata.message)
producer.send(pd)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Wed Apr 25 02:24:47 2012
@@ -668,6 +668,17 @@ object Utils extends Logging {
}
}
}
+
+ /**
+ * Create a circular (looping) iterator over a collection.
+ * @param coll An iterable over the underlying collection.
+ * @return A circular iterator over the collection.
+ */
+ def circularIterator[T](coll: Iterable[T]) = {
+ val stream: Stream[T] =
+ for (forever <- Stream.continually(1); t <- coll) yield t
+ stream.iterator
+ }
}
class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Wed Apr 25 02:24:47 2012
@@ -20,6 +20,7 @@ package kafka.utils
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Test
+import org.junit.Assert._
class UtilsTest extends JUnitSuite {
@@ -31,4 +32,23 @@ class UtilsTest extends JUnitSuite {
Utils.swallow(logger.info, throw new IllegalStateException("test"))
}
+ @Test
+ def testCircularIterator() {
+ val l = List(1, 2)
+ val itl = Utils.circularIterator(l)
+ assertEquals(1, itl.next())
+ assertEquals(2, itl.next())
+ assertEquals(1, itl.next())
+ assertEquals(2, itl.next())
+ assertFalse(itl.hasDefiniteSize)
+
+ val s = Set(1, 2)
+ val its = Utils.circularIterator(s)
+ assertEquals(1, its.next())
+ assertEquals(2, its.next())
+ assertEquals(1, its.next())
+ assertEquals(2, its.next())
+ assertEquals(1, its.next())
+ }
+
}
Modified: incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh Wed Apr 25 02:24:47 2012
@@ -241,9 +241,9 @@ test_whitelists() {
sleep 4
info "starting mirror makers"
- JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+ JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
pid_mirrormaker_1=$!
- JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
+ JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
pid_mirrormaker_2=$!
begin_timer
@@ -298,7 +298,7 @@ test_blacklists() {
sleep 4
info "starting mirror maker"
- $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+ $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
pid_mirrormaker_1=$!
start_producer blacktopic01 localhost:2181
Modified: incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties (original)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties Wed Apr 25 02:24:47 2012
@@ -26,3 +26,5 @@ producer.type=async
# to avoid dropping events if the queue is full, wait indefinitely
queue.enqueueTimeout.ms=-1
+num.producers.per.broker=2
+