You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/04/25 04:24:47 UTC

svn commit: r1330083 - in /incubator/kafka/trunk: core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/tools/ core/src/main/scala/kafka/utils/ core/src/test/scala/unit/kafka/utils/ system_test/mirror_m...

Author: junrao
Date: Wed Apr 25 02:24:47 2012
New Revision: 1330083

URL: http://svn.apache.org/viewvc?rev=1330083&view=rev
Log:
Mirroring should use multiple producers; add producer retries to DefaultEventHandler; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-332

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
    incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
    incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Apr 25 02:24:47 2012
@@ -32,6 +32,17 @@ class ProducerConfig(val props: Properti
   if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null)
     throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
 
+  /**
+   * If DefaultEventHandler is used, this specifies the number of times to
+   * retry if an error is encountered during send. Currently, it is only
+   * appropriate when broker.list points to a VIP. If the zk.connect option
+   * is used instead, this will not have any effect because with the zk-based
+   * producer, brokers are not re-selected upon retry. So retries would go to
+   * the same (potentially still down) broker. (KAFKA-253 will help address
+   * this.)
+   */
+  val numRetries = Utils.getInt(props, "num.retries", 0)
+
   /** If both broker.list and zk.connect options are specified, throw an exception */
   if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect))
     throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Apr 25 02:24:47 2012
@@ -47,9 +47,25 @@ private[kafka] class DefaultEventHandler
   private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
     if(messagesPerTopic.size > 0) {
       val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
-      syncProducer.multiSend(requests)
-      trace("kafka producer sent messages for topics %s to broker %s:%d"
-        .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
+
+      val maxAttempts = config.numRetries + 1
+      var attemptsRemaining = maxAttempts
+      var sent = false
+
+      while (attemptsRemaining > 0 && !sent) {
+        attemptsRemaining -= 1
+        try {
+          syncProducer.multiSend(requests)
+          trace("kafka producer sent messages for topics %s to broker %s:%d (on attempt %d)"
+                        .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port, maxAttempts - attemptsRemaining))
+          sent = true
+        }
+        catch {
+          case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining))
+          if (attemptsRemaining == 0)
+            throw e
+        }
+      }
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Apr 25 02:24:47 2012
@@ -115,7 +115,7 @@ private[async] class ProducerSendThread[
       if(events.size > 0)
         handler.handle(events, underlyingProducer, serializer)
     }catch {
-      case e: Exception => error("Error in handling batch of " + events.size + " events", e)
+      case e => error("Error in handling batch of " + events.size + " events", e)
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala Wed Apr 25 02:24:47 2012
@@ -33,20 +33,27 @@ object MirrorMaker extends Logging {
     info ("Starting mirror maker")
     val parser = new OptionParser
 
-    val consumerConfigOpt = parser.accepts("consumer-config",
+    val consumerConfigOpt = parser.accepts("consumer.config",
       "Consumer config to consume from a source cluster. " +
       "You may specify multiple of these.")
       .withRequiredArg()
       .describedAs("config file")
       .ofType(classOf[String])
 
-    val producerConfigOpt = parser.accepts("producer-config",
+    val producerConfigOpt = parser.accepts("producer.config",
       "Embedded producer config.")
       .withRequiredArg()
       .describedAs("config file")
       .ofType(classOf[String])
+
+    val numProducersOpt = parser.accepts("num.producers",
+      "Number of producer instances")
+      .withRequiredArg()
+      .describedAs("Number of producers")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
     
-    val numStreamsOpt = parser.accepts("num-streams",
+    val numStreamsOpt = parser.accepts("num.streams",
       "Number of consumption streams.")
       .withRequiredArg()
       .describedAs("Number of threads")
@@ -83,11 +90,11 @@ object MirrorMaker extends Logging {
 
     val numStreams = options.valueOf(numStreamsOpt)
 
-    val producer = {
+    val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
       val config = new ProducerConfig(
         Utils.loadProps(options.valueOf(producerConfigOpt)))
       new Producer[Null, Message](config)
-    }
+    })
 
     val threads = {
       val connectors = options.valuesOf(consumerConfigOpt).toList
@@ -97,7 +104,7 @@ object MirrorMaker extends Logging {
       Runtime.getRuntime.addShutdownHook(new Thread() {
         override def run() {
           connectors.foreach(_.shutdown())
-          producer.close()
+          producers.foreach(_.close())
         }
       })
 
@@ -110,7 +117,7 @@ object MirrorMaker extends Logging {
         connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
 
       streams.flatten.zipWithIndex.map(streamAndIndex => {
-        new MirrorMakerThread(streamAndIndex._1, producer, streamAndIndex._2)
+        new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
       })
     }
 
@@ -120,18 +127,20 @@ object MirrorMaker extends Logging {
   }
 
   class MirrorMakerThread(stream: KafkaStream[Message],
-                          producer: Producer[Null, Message],
+                          producers: Seq[Producer[Null, Message]],
                           threadId: Int)
           extends Thread with Logging {
 
     private val shutdownLatch = new CountDownLatch(1)
     private val threadName = "mirrormaker-" + threadId
+    private val producerSelector = Utils.circularIterator(producers)
 
     this.setName(threadName)
 
     override def run() {
       try {
         for (msgAndMetadata <- stream) {
+          val producer = producerSelector.next()
           val pd = new ProducerData[Null, Message](
             msgAndMetadata.topic, msgAndMetadata.message)
           producer.send(pd)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Wed Apr 25 02:24:47 2012
@@ -668,6 +668,17 @@ object Utils extends Logging {
       }
     }
   }
+
+  /**
+   * Create a circular (looping) iterator over a collection.
+   * @param coll An iterable over the underlying collection.
+   * @return A circular iterator over the collection.
+   */
+  def circularIterator[T](coll: Iterable[T]) = {
+    val stream: Stream[T] =
+      for (forever <- Stream.continually(1); t <- coll) yield t
+    stream.iterator
+  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Wed Apr 25 02:24:47 2012
@@ -20,6 +20,7 @@ package kafka.utils
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
+import org.junit.Assert._
 
 
 class UtilsTest extends JUnitSuite {
@@ -31,4 +32,23 @@ class UtilsTest extends JUnitSuite {
     Utils.swallow(logger.info, throw new IllegalStateException("test"))
   }
 
+  @Test
+  def testCircularIterator() {
+    val l = List(1, 2)
+    val itl = Utils.circularIterator(l)
+    assertEquals(1, itl.next())
+    assertEquals(2, itl.next())
+    assertEquals(1, itl.next())
+    assertEquals(2, itl.next())
+    assertFalse(itl.hasDefiniteSize)
+
+    val s = Set(1, 2)
+    val its = Utils.circularIterator(s)
+    assertEquals(1, its.next())
+    assertEquals(2, its.next())
+    assertEquals(1, its.next())
+    assertEquals(2, its.next())
+    assertEquals(1, its.next())
+  }
+
 }

Modified: incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh Wed Apr 25 02:24:47 2012
@@ -241,9 +241,9 @@ test_whitelists() {
     sleep 4
 
     info "starting mirror makers"
-    JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+    JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
     pid_mirrormaker_1=$!
-    JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
+    JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
     pid_mirrormaker_2=$!
 
     begin_timer
@@ -298,7 +298,7 @@ test_blacklists() {
     sleep 4
 
     info "starting mirror maker"
-    $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+    $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
     pid_mirrormaker_1=$!
 
     start_producer blacktopic01 localhost:2181

Modified: incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties?rev=1330083&r1=1330082&r2=1330083&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties (original)
+++ incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties Wed Apr 25 02:24:47 2012
@@ -26,3 +26,5 @@ producer.type=async
 # to avoid dropping events if the queue is full, wait indefinitely
 queue.enqueueTimeout.ms=-1
 
+num.producers.per.broker=2
+