You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/08 03:59:55 UTC
svn commit: r1241754 [1/2] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/common/ main/scala/kafka/javaapi/
main/scala/kafka/javaapi/producer/ main/scala/kafka/javaapi/producer/async/
main/scala/kafka/producer/ main/scala/kafka/producer/...
Author: junrao
Date: Wed Feb 8 02:59:54 2012
New Revision: 1241754
URL: http://svn.apache.org/viewvc?rev=1241754&view=rev
Log:
refactor the async producer; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-253
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/async/
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/QueueClosedException.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala?rev=1241754&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala Wed Feb 8 02:59:54 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates a producer pool initialization problem
+*/
+class FailedToSendMessageException(message: String, t: Throwable) extends RuntimeException(message, t) {
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Wed Feb 8 02:59:54 2012
@@ -16,8 +16,6 @@
*/
package kafka.javaapi
-import kafka.serializer.Encoder
-import kafka.producer.async.QueueItem
import kafka.utils.Logging
private[javaapi] object Implicits extends Logging {
@@ -30,91 +28,6 @@ private[javaapi] object Implicits extend
messageSet.getErrorCode)
}
- implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
- debug("Implicit instantiation of Java Sync Producer")
- new kafka.javaapi.producer.SyncProducer(producer)
- }
-
- implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
- debug("Implicit instantiation of Sync Producer")
- producer.underlying
- }
-
- implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T])
- : kafka.producer.async.EventHandler[T] = {
- new kafka.producer.async.EventHandler[T] {
- override def init(props: java.util.Properties) { eventHandler.init(props) }
- override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
- import collection.JavaConversions._
- eventHandler.handle(asList(events), producer, encoder)
- }
- override def close { eventHandler.close }
- }
- }
-
- implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T])
- : kafka.javaapi.producer.async.EventHandler[T] = {
- new kafka.javaapi.producer.async.EventHandler[T] {
- override def init(props: java.util.Properties) { eventHandler.init(props) }
- override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
- encoder: Encoder[T]) {
- import collection.JavaConversions._
- eventHandler.handle(asBuffer(events), producer, encoder)
- }
- override def close { eventHandler.close }
- }
- }
-
- implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T])
- : kafka.producer.async.CallbackHandler[T] = {
- new kafka.producer.async.CallbackHandler[T] {
- import collection.JavaConversions._
- override def init(props: java.util.Properties) { cbkHandler.init(props)}
- override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
- cbkHandler.beforeEnqueue(data)
- }
- override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
- cbkHandler.afterEnqueue(data, added)
- }
- override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
- cbkHandler.afterDequeuingExistingData(data)
- }
- override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
- asList(cbkHandler.beforeSendingData(asList(data)))
- }
- override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
- asBuffer(cbkHandler.lastBatchBeforeClose)
- }
- override def close { cbkHandler.close }
- }
- }
-
- implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
- : kafka.javaapi.producer.async.CallbackHandler[T] = {
- new kafka.javaapi.producer.async.CallbackHandler[T] {
- import collection.JavaConversions._
- override def init(props: java.util.Properties) { cbkHandler.init(props)}
- override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
- cbkHandler.beforeEnqueue(data)
- }
- override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
- cbkHandler.afterEnqueue(data, added)
- }
- override def afterDequeuingExistingData(data: QueueItem[T] = null)
- : java.util.List[QueueItem[T]] = {
- asList(cbkHandler.afterDequeuingExistingData(data))
- }
- override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
- : java.util.List[QueueItem[T]] = {
- asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
- }
- override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
- asList(cbkHandler.lastBatchBeforeClose)
- }
- override def close { cbkHandler.close }
- }
- }
-
implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
response.underlying
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala Wed Feb 8 02:59:54 2012
@@ -17,82 +17,11 @@
package kafka.javaapi.producer
-import kafka.utils.Utils
-import kafka.producer.async.QueueItem
-import java.util.Properties
-import kafka.producer.{ProducerPool, ProducerConfig, Partitioner}
-import kafka.serializer.Encoder
+import kafka.producer.ProducerConfig
-class Producer[K,V](config: ProducerConfig,
- partitioner: Partitioner[K],
- producerPool: ProducerPool[V],
- populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */
- /* use the other constructor*/
+class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
-
- private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null)
-
- /**
- * This constructor can be used when all config parameters will be specified through the
- * ProducerConfig object
- * @param config Producer Configuration object
- */
- def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
- new ProducerPool[V](config, Utils.getObject(config.serializerClass)))
-
- /**
- * This constructor can be used to provide pre-instantiated objects for all config parameters
- * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
- * callback handler
- * @param config Producer Configuration object
- * @param encoder Encoder used to convert an object of type V to a kafka.message.Message
- * @param eventHandler the class that implements kafka.javaapi.producer.async.IEventHandler[T] used to
- * dispatch a batch of produce requests, using an instance of kafka.javaapi.producer.SyncProducer
- * @param cbkHandler the class that implements kafka.javaapi.producer.async.CallbackHandler[T] used to inject
- * callbacks at various stages of the kafka.javaapi.producer.AsyncProducer pipeline.
- * @param partitioner class that implements the kafka.javaapi.producer.Partitioner[K], used to supply a custom
- * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
- * object in the send API
- */
- def this(config: ProducerConfig,
- encoder: Encoder[V],
- eventHandler: kafka.javaapi.producer.async.EventHandler[V],
- cbkHandler: kafka.javaapi.producer.async.CallbackHandler[V],
- partitioner: Partitioner[K]) = {
- this(config, partitioner,
- new ProducerPool[V](config, encoder,
- new kafka.producer.async.EventHandler[V] {
- override def init(props: Properties) { eventHandler.init(props) }
- override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer,
- encoder: Encoder[V]) {
- import collection.JavaConversions._
- import kafka.javaapi.Implicits._
- eventHandler.handle(asList(events), producer, encoder)
- }
- override def close { eventHandler.close }
- },
- new kafka.producer.async.CallbackHandler[V] {
- import collection.JavaConversions._
- override def init(props: Properties) { cbkHandler.init(props)}
- override def beforeEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]]): QueueItem[V] = {
- cbkHandler.beforeEnqueue(data)
- }
- override def afterEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]], added: Boolean) {
- cbkHandler.afterEnqueue(data, added)
- }
- override def afterDequeuingExistingData(data: QueueItem[V] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
- cbkHandler.afterDequeuingExistingData(data)
- }
- override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
- asList(cbkHandler.beforeSendingData(asList(data)))
- }
- override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = {
- asBuffer(cbkHandler.lastBatchBeforeClose)
- }
- override def close { cbkHandler.close }
- }))
- }
-
+ def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
/**
* Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala Wed Feb 8 02:59:54 2012
@@ -18,14 +18,16 @@ package kafka.javaapi.producer
import scala.collection.JavaConversions._
-class ProducerData[K, V](private val topic: String,
- private val key: K,
- private val data: java.util.List[V]) {
+case class ProducerData[K, V](topic: String,
+ key: K,
+ data: java.util.List[V]) {
def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
+ def this(t: String, k: K, d: V) = this(topic = t, key = k, data = asList(List(d)))
+
def getTopic: String = topic
def getKey: K = key
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Wed Feb 8 02:59:54 2012
@@ -16,7 +16,7 @@
*/
package kafka.producer
-import collection.immutable.Map
+import collection.Map
import collection.SortedSet
import kafka.cluster.{Broker, Partition}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Wed Feb 8 02:59:54 2012
@@ -17,10 +17,11 @@
package kafka.producer
import collection.mutable.HashMap
-import collection.immutable.Map
+import collection.Map
import collection.SortedSet
import kafka.cluster.{Broker, Partition}
import kafka.common.InvalidConfigException
+import kafka.api.ProducerRequest
private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo {
private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo
@@ -66,13 +67,11 @@ private[producer] class ConfigBrokerPart
val brokerInfo = bInfo.split(":")
if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value")
}
- val brokerPartitions = brokerInfoList.map(bInfo => (bInfo.split(":").head.toInt, 1))
+ val brokerIds = brokerInfoList.map(bInfo => bInfo.split(":").head.toInt)
var brokerParts = SortedSet.empty[Partition]
- brokerPartitions.foreach { bp =>
- for(i <- 0 until bp._2) {
- val bidPid = new Partition(bp._1, i)
- brokerParts = brokerParts + bidPid
- }
+ brokerIds.foreach { bid =>
+ val bidPid = new Partition(bid, ProducerRequest.RandomPartition)
+ brokerParts += bidPid
}
brokerParts
}
@@ -90,7 +89,7 @@ private[producer] class ConfigBrokerPart
brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
brokerIdHostPort(1), brokerIdHostPort(2).toInt))
}
- brokerInfo.toMap
+ brokerInfo
}
-}
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Wed Feb 8 02:59:54 2012
@@ -16,82 +16,51 @@
*/
package kafka.producer
-import async.{CallbackHandler, EventHandler}
-import kafka.serializer.Encoder
+import async._
import kafka.utils._
-import java.util.Properties
-import kafka.cluster.{Partition, Broker}
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
-import kafka.api.ProducerRequest
+import kafka.common.InvalidConfigException
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
+import kafka.serializer.Encoder
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
class Producer[K,V](config: ProducerConfig,
- partitioner: Partitioner[K],
- producerPool: ProducerPool[V],
- populateProducerPool: Boolean,
- private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
- /* use the other constructor*/
+ private val eventHandler: EventHandler[K,V]) // for testing only
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
- warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
- private val random = new java.util.Random
- // check if zookeeper based auto partition discovery is enabled
- private val zkEnabled = Utils.propertyExists(config.zkConnect)
- if(brokerPartitionInfo == null) {
- zkEnabled match {
- case true =>
- val zkProps = new Properties()
- zkProps.put("zk.connect", config.zkConnect)
- zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
- zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
- zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
- brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
- case false =>
- brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
- }
- }
- // pool of producers, one per broker
- if(populateProducerPool) {
- val allBrokers = brokerPartitionInfo.getAllBrokerInfo
- allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
+ throw new InvalidConfigException("Only one of zk.connect and broker.list should be provided")
+ if (config.batchSize > config.queueSize)
+ throw new InvalidConfigException("Batch size can't be larger than queue size.")
+
+ private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
+ private var sync: Boolean = true
+ private var producerSendThread: ProducerSendThread[K,V] = null
+ config.producerType match {
+ case "sync" =>
+ case "async" =>
+ sync = false
+ val asyncProducerID = Utils.getNextRandomInt
+ producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
+ eventHandler, config.queueTime, config.batchSize)
+ producerSendThread.start
+ case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
-/**
- * This constructor can be used when all config parameters will be specified through the
- * ProducerConfig object
- * @param config Producer Configuration object
- */
- def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
- new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null)
-
/**
- * This constructor can be used to provide pre-instantiated objects for all config parameters
- * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
- * callback handler. If you use this constructor, encoder, eventHandler, callback handler and partitioner
- * will not be picked up from the config.
+ * This constructor can be used when all config parameters will be specified through the
+ * ProducerConfig object
* @param config Producer Configuration object
- * @param encoder Encoder used to convert an object of type V to a kafka.message.Message. If this is null it
- * throws an InvalidConfigException
- * @param eventHandler the class that implements kafka.producer.async.IEventHandler[T] used to
- * dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer. If this is null, it
- * uses the DefaultEventHandler
- * @param cbkHandler the class that implements kafka.producer.async.CallbackHandler[T] used to inject
- * callbacks at various stages of the kafka.producer.AsyncProducer pipeline. If this is null, the producer does
- * not use the callback handler and hence does not invoke any callbacks
- * @param partitioner class that implements the kafka.producer.Partitioner[K], used to supply a custom
- * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
- * object in the send API. If this is null, producer uses DefaultPartitioner
*/
- def this(config: ProducerConfig,
- encoder: Encoder[V],
- eventHandler: EventHandler[V],
- cbkHandler: CallbackHandler[V],
- partitioner: Partitioner[K]) =
- this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
- new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
+ def this(config: ProducerConfig) =
+ this(config,
+ new DefaultEventHandler[K,V](config,
+ Utils.getObject[Partitioner[K]](config.partitionerClass),
+ Utils.getObject[Encoder[V]](config.serializerClass),
+ new ProducerPool(config),
+ populateProducerPool= true,
+ brokerPartitionInfo= null))
/**
* Sends the data, partitioned by key to the topic using either the
@@ -99,108 +68,49 @@ extends Logging {
* @param producerData the producer data object that encapsulates the topic, key and message data
*/
def send(producerData: ProducerData[K,V]*) {
- zkEnabled match {
- case true => zkSend(producerData: _*)
- case false => configSend(producerData: _*)
+ if (hasShutdown.get)
+ throw new ProducerClosedException
+ recordStats(producerData: _*)
+ sync match {
+ case true => eventHandler.handle(producerData)
+ case false => asyncSend(producerData: _*)
}
}
- private def zkSend(producerData: ProducerData[K,V]*) {
- val producerPoolRequests = producerData.map { pd =>
- var brokerIdPartition: Option[Partition] = None
- var brokerInfoOpt: Option[Broker] = None
-
- var numRetries: Int = 0
- while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) {
- if(numRetries > 0) {
- info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
- brokerPartitionInfo.updateInfo
- }
-
- val topicPartitionsList = getPartitionListForTopic(pd)
- val totalNumPartitions = topicPartitionsList.length
-
- val partitionId = getPartition(pd.getKey, totalNumPartitions)
- brokerIdPartition = Some(topicPartitionsList(partitionId))
- brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
- numRetries += 1
+ private def recordStats(producerData: ProducerData[K,V]*) {
+ for (data <- producerData)
+ ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size)
+ }
+
+ private def asyncSend(producerData: ProducerData[K,V]*) {
+ for (data <- producerData) {
+ val added = config.enqueueTimeoutMs match {
+ case 0 =>
+ queue.offer(data)
+ case _ =>
+ try {
+ config.enqueueTimeoutMs < 0 match {
+ case true =>
+ queue.put(data)
+ true
+ case _ =>
+ queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+ }
+ }
+ catch {
+ case e: InterruptedException =>
+ false
+ }
}
-
- brokerInfoOpt match {
- case Some(brokerInfo) =>
- debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
- " on partition " + brokerIdPartition.get.partId)
- case None =>
- throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " +
- pd.getTopic + " and key: " + pd.getKey)
+ if(!added) {
+ AsyncProducerStats.recordDroppedEvents
+ error("Event queue is full of unsent messages, could not send event: " + data.toString)
+ throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
+ }else {
+ trace("Added to send queue an event: " + data.toString)
+ trace("Remaining queue size: " + queue.remainingCapacity)
}
- producerPool.getProducerPoolData(pd.getTopic,
- new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId),
- pd.getData)
}
- producerPool.send(producerPoolRequests: _*)
- }
-
- private def configSend(producerData: ProducerData[K,V]*) {
- val producerPoolRequests = producerData.map { pd =>
- // find the broker partitions registered for this topic
- val topicPartitionsList = getPartitionListForTopic(pd)
- val totalNumPartitions = topicPartitionsList.length
-
- val randomBrokerId = random.nextInt(totalNumPartitions)
- val brokerIdPartition = topicPartitionsList(randomBrokerId)
- val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
-
- debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
- " on a randomly chosen partition")
- val partition = ProducerRequest.RandomPartition
- debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
- brokerIdPartition.partId)
- producerPool.getProducerPoolData(pd.getTopic,
- new Partition(brokerIdPartition.brokerId, partition),
- pd.getData)
- }
- producerPool.send(producerPoolRequests: _*)
- }
-
- private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
- debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
- val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
- debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
- val totalNumPartitions = topicPartitionsList.length
- if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
- topicPartitionsList
- }
-
- /**
- * Retrieves the partition id and throws an InvalidPartitionException if
- * the value of partition is not between 0 and numPartitions-1
- * @param key the partition key
- * @param numPartitions the total number of available partitions
- * @returns the partition id
- */
- private def getPartition(key: K, numPartitions: Int): Int = {
- if(numPartitions <= 0)
- throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
- "\n Valid values are > 0")
- val partition = if(key == null) random.nextInt(numPartitions)
- else partitioner.partition(key , numPartitions)
- if(partition < 0 || partition >= numPartitions)
- throw new InvalidPartitionException("Invalid partition id : " + partition +
- "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
- partition
- }
-
- /**
- * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
- * on registration of new broker in zookeeper
- * @param bid the id of the broker
- * @param host the hostname of the broker
- * @param port the port of the broker
- */
- private def producerCbk(bid: Int, host: String, port: Int) = {
- if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
- else debug("Skipping the callback since populateProducerPool = false")
}
/**
@@ -210,8 +120,38 @@ extends Logging {
def close() = {
val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) {
- producerPool.close
- brokerPartitionInfo.close
+ if (producerSendThread != null)
+ producerSendThread.shutdown
+ eventHandler.close
+ }
+ }
+}
+
+trait ProducerTopicStatMBean {
+ def getMessagesPerTopic: Long
+}
+
+@threadsafe
+class ProducerTopicStat extends ProducerTopicStatMBean {
+ private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+
+ def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
+
+ def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+}
+
+object ProducerTopicStat extends Logging {
+ private val stats = new Pool[String, ProducerTopicStat]
+
+ def getProducerTopicStat(topic: String): ProducerTopicStat = {
+ var stat = stats.get(topic)
+ if (stat == null) {
+ stat = new ProducerTopicStat
+ if (stats.putIfNotExists(topic, stat) == null)
+ Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic)
+ else
+ stat = stats.get(topic)
}
+ return stat
}
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala?rev=1241754&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala Wed Feb 8 02:59:54 2012
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+class ProducerClosedException() extends RuntimeException("producer already closed") {
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Feb 8 02:59:54 2012
@@ -17,13 +17,13 @@
package kafka.producer
-import async.AsyncProducerConfigShared
+import async.AsyncProducerConfig
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.common.InvalidConfigException
class ProducerConfig(val props: Properties) extends ZKConfig(props)
- with AsyncProducerConfigShared with SyncProducerConfigShared{
+ with AsyncProducerConfig with SyncProducerConfigShared{
/** For bypassing zookeeper based auto partition discovery, use this config *
* to pass in static broker and per-broker partition information. Format- *
@@ -37,7 +37,7 @@ class ProducerConfig(val props: Properti
throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
/** the partitioner class for partitioning events amongst sub-topics */
- val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
+ val partitionerClass = Utils.getString(props, "partitioner.class", null)
/** this parameter specifies whether the messages are sent asynchronously *
* or not. Valid values are - async for asynchronous send *
@@ -71,5 +71,7 @@ class ProducerConfig(val props: Properti
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
*/
- val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3)
+ val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
+
+ val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 5)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala Wed Feb 8 02:59:54 2012
@@ -23,14 +23,16 @@ package kafka.producer
* @param key the key used by the partitioner to pick a broker partition
* @param data variable length data to be published as Kafka messages under topic
*/
-class ProducerData[K, V](private val topic: String,
- private val key: K,
- private val data: Seq[V]) {
+case class ProducerData[K,V](topic: String,
+ key: K,
+ data: Seq[V]) {
def this(t: String, d: Seq[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d))
+ def this(t: String, k: K, d: V) = this(topic = t, key = k, data = List(d))
+
def getTopic: String = topic
def getKey: K = key
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Wed Feb 8 02:59:54 2012
@@ -17,163 +17,42 @@
package kafka.producer
-import async._
import java.util.Properties
-import kafka.serializer.Encoder
-import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
-import kafka.cluster.{Partition, Broker}
-import kafka.api.ProducerRequest
-import kafka.common.{UnavailableProducerException, InvalidConfigException}
-import kafka.utils.{Utils, Logging}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
-
-class ProducerPool[V](private val config: ProducerConfig,
- private val serializer: Encoder[V],
- private val syncProducers: ConcurrentMap[Int, SyncProducer],
- private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]],
- private val inputEventHandler: EventHandler[V] = null,
- private val cbkHandler: CallbackHandler[V] = null) extends Logging {
-
- private var eventHandler = inputEventHandler
- if(eventHandler == null)
- eventHandler = new DefaultEventHandler(config, cbkHandler)
-
- if(serializer == null)
- throw new InvalidConfigException("serializer passed in is null!")
-
- private var sync: Boolean = true
- config.producerType match {
- case "sync" =>
- case "async" => sync = false
- case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
- }
+import kafka.cluster.Broker
+import kafka.utils.Logging
+import java.util.concurrent.ConcurrentHashMap
+
+class ProducerPool(private val config: ProducerConfig) extends Logging {
+ private val syncProducers = new ConcurrentHashMap[Int, SyncProducer]
- def this(config: ProducerConfig, serializer: Encoder[V],
- eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V]) =
- this(config, serializer,
- new ConcurrentHashMap[Int, SyncProducer](),
- new ConcurrentHashMap[Int, AsyncProducer[V]](),
- eventHandler, cbkHandler)
-
- def this(config: ProducerConfig, serializer: Encoder[V]) = this(config, serializer,
- new ConcurrentHashMap[Int, SyncProducer](),
- new ConcurrentHashMap[Int, AsyncProducer[V]](),
- Utils.getObject(config.eventHandler),
- Utils.getObject(config.cbkHandler))
- /**
- * add a new producer, either synchronous or asynchronous, connecting
- * to the specified broker
- * @param bid the id of the broker
- * @param host the hostname of the broker
- * @param port the port of the broker
- */
def addProducer(broker: Broker) {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props)
- if(sync) {
- val producer = new SyncProducer(new SyncProducerConfig(props))
- info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
- syncProducers.put(broker.id, producer)
- } else {
- val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
- new SyncProducer(new SyncProducerConfig(props)),
- serializer,
- eventHandler, config.eventHandlerProps,
- cbkHandler, config.cbkHandlerProps)
- producer.start
- info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
- asyncProducers.put(broker.id, producer)
- }
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+ syncProducers.put(broker.id, producer)
}
/**
- * selects either a synchronous or an asynchronous producer, for
- * the specified broker id and calls the send API on the selected
- * producer to publish the data to the specified broker partition
- * @param poolData the producer pool request object
+ * For testing purpose
*/
- def send(poolData: ProducerPoolData[V]*) {
- val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct
- var remainingRequests = poolData.toSeq
- distinctBrokers.foreach { bid =>
- val requestsForThisBid = remainingRequests partition (_.getBidPid.brokerId == bid)
- remainingRequests = requestsForThisBid._2
-
- if(sync) {
- val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
- new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
- messages = req.getData.map(d => serializer.toMessage(d)): _*)))
- debug("Fetching sync producer for broker id: " + bid)
- val producer = syncProducers.get(bid)
- if(producer != null) {
- if(producerRequests.size > 1)
- producer.multiSend(producerRequests.toArray)
- else
- producer.send(topic = producerRequests(0).topic,
- partition = producerRequests(0).partition,
- messages = producerRequests(0).messages)
- config.compressionCodec match {
- case NoCompressionCodec => debug("Sending message to broker " + bid)
- case _ => debug("Sending compressed messages to broker " + bid)
- }
- }else
- throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
- "Sync Producer for broker " + bid + " does not exist in the pool")
- }else {
- debug("Fetching async producer for broker id: " + bid)
- val producer = asyncProducers.get(bid)
- if(producer != null) {
- requestsForThisBid._1.foreach { req =>
- req.getData.foreach(d => producer.send(req.getTopic, d, req.getBidPid.partId))
- }
- if(logger.isDebugEnabled)
- config.compressionCodec match {
- case NoCompressionCodec => debug("Sending message")
- case _ => debug("Sending compressed messages")
- }
- }
- else
- throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
- "Async Producer for broker " + bid + " does not exist in the pool")
- }
- }
+ def addProducer(brokerId: Int, syncProducer: SyncProducer) {
+ syncProducers.put(brokerId, syncProducer)
}
- /**
- * Closes all the producers in the pool
- */
- def close() = {
- config.producerType match {
- case "sync" =>
- info("Closing all sync producers")
- val iter = syncProducers.values.iterator
- while(iter.hasNext)
- iter.next.close
- case "async" =>
- info("Closing all async producers")
- val iter = asyncProducers.values.iterator
- while(iter.hasNext)
- iter.next.close
- }
+ def getProducer(brokerId: Int) : SyncProducer = {
+ syncProducers.get(brokerId)
}
/**
- * This constructs and returns the request object for the producer pool
- * @param topic the topic to which the data should be published
- * @param bidPid the broker id and partition id
- * @param data the data to be published
+ * Closes all the producers in the pool
*/
- def getProducerPoolData(topic: String, bidPid: Partition, data: Seq[V]): ProducerPoolData[V] = {
- new ProducerPoolData[V](topic, bidPid, data)
- }
-
- class ProducerPoolData[V](topic: String,
- bidPid: Partition,
- data: Seq[V]) {
- def getTopic: String = topic
- def getBidPid: Partition = bidPid
- def getData: Seq[V] = data
+ def close() = {
+ info("Closing all sync producers")
+ val iter = syncProducers.values.iterator
+ while(iter.hasNext)
+ iter.next.close
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Wed Feb 8 02:59:54 2012
@@ -18,13 +18,8 @@ package kafka.producer.async
import java.util.Properties
import kafka.utils.Utils
-import kafka.producer.SyncProducerConfig
-class AsyncProducerConfig(override val props: Properties) extends SyncProducerConfig(props)
- with AsyncProducerConfigShared {
-}
-
-trait AsyncProducerConfigShared {
+trait AsyncProducerConfig {
val props: Properties
/* maximum time, in milliseconds, for buffering data on the producer queue */
@@ -46,16 +41,4 @@ trait AsyncProducerConfigShared {
/** the serializer class for events */
val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder")
-
- /** the callback handler for one or multiple events */
- val cbkHandler = Utils.getString(props, "callback.handler", null)
-
- /** properties required to initialize the callback handler */
- val cbkHandlerProps = Utils.getProps(props, "callback.handler.props", null)
-
- /** the handler for events */
- val eventHandler = Utils.getString(props, "event.handler", null)
-
- /** properties required to initialize the callback handler */
- val eventHandlerProps = Utils.getProps(props, "event.handler.props", null)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala Wed Feb 8 02:59:54 2012
@@ -18,33 +18,21 @@
package kafka.producer.async
import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.BlockingQueue
-import org.apache.log4j.Logger
import kafka.utils.Utils
class AsyncProducerStats extends AsyncProducerStatsMBean {
val droppedEvents = new AtomicInteger(0)
- val numEvents = new AtomicInteger(0)
-
- def getAsyncProducerEvents: Int = numEvents.get
def getAsyncProducerDroppedEvents: Int = droppedEvents.get
def recordDroppedEvents = droppedEvents.getAndAdd(1)
-
- def recordEvent = numEvents.getAndAdd(1)
-}
-
-class AsyncProducerQueueSizeStats[T](private val queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerQueueSizeStatsMBean {
- def getAsyncProducerQueueSize: Int = queue.size
}
object AsyncProducerStats {
- private val logger = Logger.getLogger(getClass())
private val stats = new AsyncProducerStats
- Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName)
+ val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
- def recordDroppedEvents = stats.recordDroppedEvents
+ Utils.registerMBean(stats, ProducerMBeanName)
- def recordEvent = stats.recordEvent
+ def recordDroppedEvents = stats.recordDroppedEvents
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala Wed Feb 8 02:59:54 2012
@@ -18,7 +18,6 @@
package kafka.producer.async
trait AsyncProducerStatsMBean {
- def getAsyncProducerEvents: Int
def getAsyncProducerDroppedEvents: Int
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Feb 8 02:59:54 2012
@@ -17,45 +17,175 @@
package kafka.producer.async
-import collection.mutable.HashMap
-import collection.mutable.Map
import kafka.api.ProducerRequest
import kafka.serializer.Encoder
import java.util.Properties
-import kafka.utils.Logging
-import kafka.producer.{ProducerConfig, SyncProducer}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+import kafka.producer._
+import kafka.utils.{ZKConfig, Utils, Logging}
+import kafka.cluster.{Partition, Broker}
+import collection.mutable.{ListBuffer, HashMap}
+import scala.collection.Map
+import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException}
+import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
+
+class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing
+ private val partitioner: Partitioner[K], // use the other constructor
+ private val encoder: Encoder[V],
+ private val producerPool: ProducerPool,
+ private val populateProducerPool: Boolean,
+ private var brokerPartitionInfo: BrokerPartitionInfo)
+ extends EventHandler[K,V] with Logging {
+
+ private val lock = new Object()
+ private val zkEnabled = Utils.propertyExists(config.zkConnect)
+ if(brokerPartitionInfo == null) {
+ zkEnabled match {
+ case true =>
+ val zkProps = new Properties()
+ zkProps.put("zk.connect", config.zkConnect)
+ zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
+ zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
+ zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
+ brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
+ case false =>
+ brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
+ }
+ }
+
+ // pool of producers, one per broker
+ if(populateProducerPool) {
+ val allBrokers = brokerPartitionInfo.getAllBrokerInfo
+ allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
+ }
+
+ /**
+ * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
+ * on registration of new broker in zookeeper
+ * @param bid the id of the broker
+ * @param host the hostname of the broker
+ * @param port the port of the broker
+ */
+ private def producerCbk(bid: Int, host: String, port: Int) = {
+ if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
+ else debug("Skipping the callback since populateProducerPool = false")
+ }
+ def handle(events: Seq[ProducerData[K,V]]) {
+ lock synchronized {
+ val serializedData = serialize(events)
+ handleSerializedData(serializedData, config.producerRetries)
+ }
+ }
+
+ private def handleSerializedData(messages: Seq[ProducerData[K,Message]], requiredRetries: Int) {
+ val partitionedData = partitionAndCollate(messages)
+ for ( (brokerid, eventsPerBrokerMap) <- partitionedData) {
+ if (logger.isTraceEnabled)
+ eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partition: %d"
+ .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+ val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+
+ try {
+ send(brokerid, messageSetPerBroker)
+ }
+ catch {
+ case t =>
+ warn("error sending data to broker " + brokerid, t)
+ var numRetries = 0
+ val eventsPerBroker = new ListBuffer[ProducerData[K,Message]]
+ eventsPerBrokerMap.foreach(e => eventsPerBroker.appendAll(e._2))
+ while (numRetries < requiredRetries) {
+ numRetries +=1
+ Thread.sleep(config.producerRetryBackoffMs)
+ try {
+ brokerPartitionInfo.updateInfo
+ handleSerializedData(eventsPerBroker, 0)
+ return
+ }
+ catch {
+ case t => warn("error sending data to broker " + brokerid + " in " + numRetries + " retry", t)
+ }
+ }
+ throw new FailedToSendMessageException("can't send data after " + numRetries + " retries", t)
+ }
+ }
+ }
-private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
- val cbkHandler: CallbackHandler[T]) extends EventHandler[T] with Logging {
+ def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
+ events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
+ }
- override def init(props: Properties) { }
+ def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = {
+ val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
+ for (event <- events) {
+ val topicPartitionsList = getPartitionListForTopic(event)
+ val totalNumPartitions = topicPartitionsList.length
+
+ val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+ val brokerPartition = topicPartitionsList(partitionIndex)
+
+ var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+ ret.get(brokerPartition.brokerId) match {
+ case Some(element) =>
+ dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+ case None =>
+ dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+ ret.put(brokerPartition.brokerId, dataPerBroker)
+ }
- override def handle(events: Seq[QueueItem[T]], syncProducer: SyncProducer, serializer: Encoder[T]) {
- var processedEvents = events
- if(cbkHandler != null)
- processedEvents = cbkHandler.beforeSendingData(events)
+ val topicAndPartition = (event.getTopic, brokerPartition.partId)
+ var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+ dataPerBroker.get(topicAndPartition) match {
+ case Some(element) =>
+ dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+ case None =>
+ dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+ dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+ }
+ dataPerTopicPartition.append(event)
+ }
+ ret
+ }
- if(logger.isTraceEnabled)
- processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d"
- .format(event.getTopic, event.getPartition)))
+ private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
+ debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
+ val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+ debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
+ val totalNumPartitions = topicPartitionsList.length
+ if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+ topicPartitionsList
+ }
- send(serialize(collate(processedEvents), serializer), syncProducer)
+ /**
+ * Retrieves the partition id and throws an InvalidPartitionException if
+ * the value of partition is not between 0 and numPartitions-1
+ * @param key the partition key
+ * @param numPartitions the total number of available partitions
+ * @returns the partition id
+ */
+ private def getPartition(key: K, numPartitions: Int): Int = {
+ if(numPartitions <= 0)
+ throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
+ "\n Valid values are > 0")
+ val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
+ else partitioner.partition(key , numPartitions)
+ if(partition < 0 || partition >= numPartitions)
+ throw new InvalidPartitionException("Invalid partition id : " + partition +
+ "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+ partition
}
- private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
+ private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) {
if(messagesPerTopic.size > 0) {
val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
+ val syncProducer = producerPool.getProducer(brokerId)
syncProducer.multiSend(requests)
trace("kafka producer sent messages for topics %s to broker %s:%d"
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
}
}
- private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
- serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
- val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
+ private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = {
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
@@ -63,55 +193,49 @@ private[kafka] class DefaultEventHandler
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
- val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents =>
- ((topicAndEvents._1._1, topicAndEvents._1._2),
- config.compressionCodec match {
- case NoCompressionCodec =>
- trace("Sending %d messages with no compression to topic %s on partition %d"
- .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2))
- new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
- case _ =>
- config.compressedTopics.size match {
- case 0 =>
- trace("Sending %d messages with compression codec %d to topic %s on partition %d"
- .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
- new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
- case _ =>
- if(config.compressedTopics.contains(topicAndEvents._1._1)) {
+ val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
+ {
+ val topicAndPartition = e._1
+ val produceData = e._2
+ val messages = new ListBuffer[Message]
+ produceData.map(p => messages.appendAll(p.getData))
+
+ ( topicAndPartition,
+ config.compressionCodec match {
+ case NoCompressionCodec =>
+ trace("Sending %d messages with no compression to topic %s on partition %d"
+ .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+ new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+ case _ =>
+ config.compressedTopics.size match {
+ case 0 =>
trace("Sending %d messages with compression codec %d to topic %s on partition %d"
- .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
- new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
- }
- else {
- trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
- .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, topicAndEvents._1._1,
- config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
- }
- }
- })
- }
- messagesPerTopicPartition
- }
-
- private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
- val collatedEvents = new HashMap[(String, Int), Seq[T]]
- val distinctTopics = events.map(e => e.getTopic).toSeq.distinct
- val distinctPartitions = events.map(e => e.getPartition).distinct
-
- var remainingEvents = events
- distinctTopics foreach { topic =>
- val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic))
- remainingEvents = topicEvents._2
- distinctPartitions.foreach { p =>
- val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1
- if(topicPartitionEvents.size > 0)
- collatedEvents += ((topic, p) -> topicPartitionEvents.map(q => q.getData))
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+ new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+ case _ =>
+ if(config.compressedTopics.contains(topicAndPartition._1)) {
+ trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+ new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+ }
+ else {
+ trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
+ .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
+ config.compressedTopics.toString))
+ new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+ }
+ }
+ }
+ )
}
}
- collatedEvents
+ messagesPerTopicPartition
}
- override def close = {
+ def close() {
+ if (producerPool != null)
+ producerPool.close
+ if (brokerPartitionInfo != null)
+ brokerPartitionInfo.close
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala Wed Feb 8 02:59:54 2012
@@ -16,30 +16,21 @@
*/
package kafka.producer.async
-import java.util.Properties
-import kafka.producer.SyncProducer
-import kafka.serializer.Encoder
+import kafka.producer.ProducerData
/**
- * Handler that dispatches the batched data from the queue of the
- * asynchronous producer.
+ * Handler that dispatches the batched data from the queue.
*/
-trait EventHandler[T] {
- /**
- * Initializes the event handler using a Properties object
- * @param props the properties used to initialize the event handler
- */
- def init(props: Properties) {}
+trait EventHandler[K,V] {
/**
* Callback to dispatch the batched data and send it to a Kafka server
* @param events the data sent to the producer
- * @param producer the low-level producer used to send the data
*/
- def handle(events: Seq[QueueItem[T]], producer: SyncProducer, encoder: Encoder[T])
+ def handle(events: Seq[ProducerData[K,V]])
/**
* Cleans up and shuts down the event handler
*/
- def close {}
+ def close
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Feb 8 02:59:54 2012
@@ -20,32 +20,21 @@ package kafka.producer.async
import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ListBuffer
-import kafka.serializer.Encoder
-import kafka.producer.SyncProducer
+import kafka.producer.ProducerData
-private[async] class ProducerSendThread[T](val threadName: String,
- val queue: BlockingQueue[QueueItem[T]],
- val serializer: Encoder[T],
- val underlyingProducer: SyncProducer,
- val handler: EventHandler[T],
- val cbkHandler: CallbackHandler[T],
- val queueTime: Long,
- val batchSize: Int,
- val shutdownCommand: Any) extends Thread(threadName) with Logging {
+class ProducerSendThread[K,V](val threadName: String,
+ val queue: BlockingQueue[ProducerData[K,V]],
+ val handler: EventHandler[K,V],
+ val queueTime: Long,
+ val batchSize: Int) extends Thread(threadName) with Logging {
private val shutdownLatch = new CountDownLatch(1)
+ private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
override def run {
try {
- val remainingEvents = processEvents
- debug("Remaining events = " + remainingEvents.size)
-
- // handle remaining events
- if(remainingEvents.size > 0) {
- debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size))
- tryToHandle(remainingEvents)
- }
+ processEvents
}catch {
case e => error("Error in sending events: ", e)
}finally {
@@ -53,34 +42,30 @@ private[async] class ProducerSendThread[
}
}
- def awaitShutdown = shutdownLatch.await
-
def shutdown = {
- handler.close
- info("Shutdown thread complete")
+ info("Beging shutting down ProducerSendThread")
+ queue.put(shutdownCommand)
+ shutdownLatch.await
+ info("Shutdown ProducerSendThread complete")
}
- private def processEvents(): Seq[QueueItem[T]] = {
+ private def processEvents() {
var lastSend = SystemTime.milliseconds
- var events = new ListBuffer[QueueItem[T]]
+ var events = new ListBuffer[ProducerData[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
- .takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach {
+ .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
- trace("Dequeued item for topic %s and partition %d"
- .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
- // handle the dequeued current item
- if(cbkHandler != null)
- events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
- else
- events += currentQueueItem
+ trace("Dequeued item for topic %s, partition key: %s, data: %s"
+ .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
+ events += currentQueueItem
// check if the batch size is reached
full = events.size >= batchSize
@@ -91,36 +76,22 @@ private[async] class ProducerSendThread[
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
- events = new ListBuffer[QueueItem[T]]
+ events = new ListBuffer[ProducerData[K,V]]
}
}
if(queue.size > 0)
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
.format(queue.size))
- if(cbkHandler != null) {
- info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
- val addedEvents = cbkHandler.lastBatchBeforeClose
- logEvents("last batch before close", addedEvents)
- events = events ++ addedEvents
- }
- events
}
- def tryToHandle(events: Seq[QueueItem[T]]) {
+ def tryToHandle(events: Seq[ProducerData[K,V]]) {
try {
debug("Handling " + events.size + " events")
if(events.size > 0)
- handler.handle(events, underlyingProducer, serializer)
+ handler.handle(events)
}catch {
- case e: Exception => error("Error in handling batch of " + events.size + " events", e)
+ case e => error("Error in handling batch of " + events.size + " events", e)
}
}
- private def logEvents(tag: String, events: Iterable[QueueItem[T]]) {
- if(logger.isTraceEnabled) {
- trace("events for " + tag + ":")
- for (event <- events)
- trace(event.getData.toString)
- }
- }
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Wed Feb 8 02:59:54 2012
@@ -20,9 +20,7 @@ package kafka.tools
import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
-import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.DefaultEncoder
-import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
+import kafka.producer.{ProducerData, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{ZKStringSerializer, Logging}
import kafka.api.OffsetRequest
@@ -171,9 +169,7 @@ object ReplayLogProducer extends Logging
props.put("producer.type", "async")
val producerConfig = new ProducerConfig(props)
- val producer = new Producer[Message, Message](producerConfig, new DefaultEncoder,
- new DefaultEventHandler[Message](producerConfig, null),
- null, new DefaultPartitioner[Message])
+ val producer = new Producer[Message, Message](producerConfig)
override def run() {
info("Starting consumer thread..")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Feb 8 02:59:54 2012
@@ -24,17 +24,22 @@ import java.util.concurrent.atomic._
import java.lang.management._
import java.util.zip.CRC32
import javax.management._
-import java.util.Properties
import scala.collection._
import scala.collection.mutable
import kafka.message.{NoCompressionCodec, CompressionCodec}
import org.I0Itec.zkclient.ZkClient
+import java.util.{Random, Properties}
/**
* Helper functions!
*/
object Utils extends Logging {
+ val random = new Random
+
+ def getNextRandomInt(): Int = random.nextInt
+ def getNextRandomInt(upper: Int): Int = random.nextInt(upper)
+
/**
* Wrap the given function in a java.lang.Runnable
* @param fun A function
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Wed Feb 8 02:59:54 2012
@@ -66,7 +66,7 @@ class ZookeeperConsumerConnectorTest ext
// also the iterator should support re-entrant, so loop it twice
for (i <- 0 until 2) {
try {
- getMessages(nMessages*2, topicMessageStreams0)
+ getMessagesSortedByChecksum(nMessages*2, topicMessageStreams0)
fail("should get an exception")
}
catch {
@@ -84,7 +84,7 @@ class ZookeeperConsumerConnectorTest ext
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
- val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+ val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1, receivedMessages1)
// commit consumed offsets
zkConsumerConnector1.commitOffsets
@@ -97,8 +97,8 @@ class ZookeeperConsumerConnectorTest ext
// send some messages to each broker
val sentMessages2 = sendMessages(nMessages, "batch2")
Thread.sleep(200)
- val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
- val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+ val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages2, receivedMessages2)
@@ -111,8 +111,8 @@ class ZookeeperConsumerConnectorTest ext
Thread.sleep(200)
val sentMessages3 = sendMessages(nMessages, "batch3")
Thread.sleep(200)
- val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
- val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+ val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages3, receivedMessages3)
@@ -135,7 +135,7 @@ class ZookeeperConsumerConnectorTest ext
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
- val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+ val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1, receivedMessages1)
// commit consumed offsets
zkConsumerConnector1.commitOffsets
@@ -149,8 +149,8 @@ class ZookeeperConsumerConnectorTest ext
// send some messages to each broker
val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
Thread.sleep(200)
- val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
- val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+ val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages2, receivedMessages2)
@@ -164,8 +164,8 @@ class ZookeeperConsumerConnectorTest ext
Thread.sleep(200)
val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
Thread.sleep(200)
- val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
- val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+ val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages3, receivedMessages3)
@@ -195,13 +195,13 @@ class ZookeeperConsumerConnectorTest ext
}
val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
- getMessages(100, topicMessageStreams0)
+ getMessagesSortedByChecksum(100, topicMessageStreams0)
zkConsumerConnector0.shutdown
// at this point, only some part of the message set was consumed. So consumed offset should still be 0
// also fetched offset should be 0
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
- val receivedMessages = getMessages(400, topicMessageStreams1)
+ val receivedMessages = getMessagesSortedByChecksum(400, topicMessageStreams1)
val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sortedSentMessages, sortedReceivedMessages)
@@ -268,19 +268,8 @@ class ZookeeperConsumerConnectorTest ext
messages.sortWith((s,t) => s.checksum < t.checksum)
}
- def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
- var messages: List[Message] = Nil
- for ((topic, messageStreams) <- topicMessageStreams) {
- for (messageStream <- messageStreams) {
- val iterator = messageStream.iterator
- for (i <- 0 until nMessagesPerThread) {
- assertTrue(iterator.hasNext)
- val message = iterator.next
- messages ::= message
- debug("received message: " + Utils.toString(message.payload, "UTF-8"))
- }
- }
- }
+ def getMessagesSortedByChecksum(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
+ val messages = TestUtils.getConsumedMessages(nMessagesPerThread, topicMessageStreams)
messages.sortWith((s,t) => s.checksum < t.checksum)
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Wed Feb 8 02:59:54 2012
@@ -52,7 +52,7 @@ class PrimitiveApiTest extends JUnit3Sui
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
- stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+ stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
Thread.sleep(200)
var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
@@ -73,7 +73,7 @@ class PrimitiveApiTest extends JUnit3Sui
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
- stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+ stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
Thread.sleep(200)
var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Wed Feb 8 02:59:54 2012
@@ -67,7 +67,7 @@ class ZookeeperConsumerConnectorTest ext
def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
var messages: List[Message] = Nil
- val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port))
+ val producer = new kafka.javaapi.producer.SyncProducer(TestUtils.createProducer("localhost", conf.port))
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x =>
new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray