You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/08 03:59:55 UTC

svn commit: r1241754 [1/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/common/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/producer/ main/scala/kafka/javaapi/producer/async/ main/scala/kafka/producer/ main/scala/kafka/producer/...

Author: junrao
Date: Wed Feb  8 02:59:54 2012
New Revision: 1241754

URL: http://svn.apache.org/viewvc?rev=1241754&view=rev
Log:
refactor the async producer; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-253

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/async/
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/QueueClosedException.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala?rev=1241754&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FailedToSendMessageException.scala Wed Feb  8 02:59:54 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates a producer pool initialization problem
+*/
+class FailedToSendMessageException(message: String, t: Throwable) extends RuntimeException(message, t) {
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Wed Feb  8 02:59:54 2012
@@ -16,8 +16,6 @@
 */
 package kafka.javaapi
 
-import kafka.serializer.Encoder
-import kafka.producer.async.QueueItem
 import kafka.utils.Logging
 
 private[javaapi] object Implicits extends Logging {
@@ -30,91 +28,6 @@ private[javaapi] object Implicits extend
                                                    messageSet.getErrorCode)
   }
 
-  implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
-    debug("Implicit instantiation of Java Sync Producer")
-    new kafka.javaapi.producer.SyncProducer(producer)
-  }
-
-  implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
-    debug("Implicit instantiation of Sync Producer")
-    producer.underlying
-  }
-
-  implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T])
-       : kafka.producer.async.EventHandler[T] = {
-    new kafka.producer.async.EventHandler[T] {
-      override def init(props: java.util.Properties) { eventHandler.init(props) }
-      override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
-        import collection.JavaConversions._
-        eventHandler.handle(asList(events), producer, encoder)
-      }
-      override def close { eventHandler.close }
-    }
-  }
-
-  implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T])
-    : kafka.javaapi.producer.async.EventHandler[T] = {
-    new kafka.javaapi.producer.async.EventHandler[T] {
-      override def init(props: java.util.Properties) { eventHandler.init(props) }
-      override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
-                          encoder: Encoder[T]) {
-        import collection.JavaConversions._
-        eventHandler.handle(asBuffer(events), producer, encoder)
-      }
-      override def close { eventHandler.close }
-    }
-  }
-
-  implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T])
-      : kafka.producer.async.CallbackHandler[T] = {
-    new kafka.producer.async.CallbackHandler[T] {
-      import collection.JavaConversions._
-      override def init(props: java.util.Properties) { cbkHandler.init(props)}
-      override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
-        cbkHandler.beforeEnqueue(data)
-      }
-      override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
-        cbkHandler.afterEnqueue(data, added)
-      }
-      override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
-        cbkHandler.afterDequeuingExistingData(data)
-      }
-      override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
-        asList(cbkHandler.beforeSendingData(asList(data)))
-      }
-      override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
-        asBuffer(cbkHandler.lastBatchBeforeClose)
-      }
-      override def close { cbkHandler.close }
-    }
-  }
-
-  implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
-      : kafka.javaapi.producer.async.CallbackHandler[T] = {
-    new kafka.javaapi.producer.async.CallbackHandler[T] {
-      import collection.JavaConversions._
-      override def init(props: java.util.Properties) { cbkHandler.init(props)}
-      override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
-        cbkHandler.beforeEnqueue(data)
-      }
-      override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
-        cbkHandler.afterEnqueue(data, added)
-      }
-      override def afterDequeuingExistingData(data: QueueItem[T] = null)
-      : java.util.List[QueueItem[T]] = {
-        asList(cbkHandler.afterDequeuingExistingData(data))
-      }
-      override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
-      : java.util.List[QueueItem[T]] = {
-        asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
-      }
-      override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
-        asList(cbkHandler.lastBatchBeforeClose)
-      }
-      override def close { cbkHandler.close }
-    }
-  }
-
   implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
     response.underlying
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala Wed Feb  8 02:59:54 2012
@@ -17,82 +17,11 @@
 
 package kafka.javaapi.producer
 
-import kafka.utils.Utils
-import kafka.producer.async.QueueItem
-import java.util.Properties
-import kafka.producer.{ProducerPool, ProducerConfig, Partitioner}
-import kafka.serializer.Encoder
+import kafka.producer.ProducerConfig
 
-class Producer[K,V](config: ProducerConfig,
-                    partitioner: Partitioner[K],
-                    producerPool: ProducerPool[V],
-                    populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */
-                                                          /* use the other constructor*/
+class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
 {
-
-  private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null)
-
-  /**
-   * This constructor can be used when all config parameters will be specified through the
-   * ProducerConfig object
-   * @param config Producer Configuration object
-   */
-  def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
-    new ProducerPool[V](config, Utils.getObject(config.serializerClass)))
-
-  /**
-   * This constructor can be used to provide pre-instantiated objects for all config parameters
-   * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
-   * callback handler
-   * @param config Producer Configuration object
-   * @param encoder Encoder used to convert an object of type V to a kafka.message.Message
-   * @param eventHandler the class that implements kafka.javaapi.producer.async.IEventHandler[T] used to
-   * dispatch a batch of produce requests, using an instance of kafka.javaapi.producer.SyncProducer
-   * @param cbkHandler the class that implements kafka.javaapi.producer.async.CallbackHandler[T] used to inject
-   * callbacks at various stages of the kafka.javaapi.producer.AsyncProducer pipeline.
-   * @param partitioner class that implements the kafka.javaapi.producer.Partitioner[K], used to supply a custom
-   * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
-   * object in the  send API
-   */
-  def this(config: ProducerConfig,
-           encoder: Encoder[V],
-           eventHandler: kafka.javaapi.producer.async.EventHandler[V],
-           cbkHandler: kafka.javaapi.producer.async.CallbackHandler[V],
-           partitioner: Partitioner[K]) = {
-    this(config, partitioner,
-         new ProducerPool[V](config, encoder,
-                             new kafka.producer.async.EventHandler[V] {
-                               override def init(props: Properties) { eventHandler.init(props) }
-                               override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer,
-                                                   encoder: Encoder[V]) {
-                                 import collection.JavaConversions._
-                                 import kafka.javaapi.Implicits._
-                                 eventHandler.handle(asList(events), producer, encoder)
-                               }
-                               override def close { eventHandler.close }
-                             },
-                             new kafka.producer.async.CallbackHandler[V] {
-                               import collection.JavaConversions._
-                               override def init(props: Properties) { cbkHandler.init(props)}
-                               override def beforeEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]]): QueueItem[V] = {
-                                 cbkHandler.beforeEnqueue(data)
-                               }
-                               override def afterEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]], added: Boolean) {
-                                 cbkHandler.afterEnqueue(data, added)
-                               }
-                               override def afterDequeuingExistingData(data: QueueItem[V] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
-                                 cbkHandler.afterDequeuingExistingData(data)
-                               }
-                               override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
-                                 asList(cbkHandler.beforeSendingData(asList(data)))
-                               }
-                               override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = {
-                                 asBuffer(cbkHandler.lastBatchBeforeClose)
-                               }
-                               override def close { cbkHandler.close }
-                             }))
-  }
-
+  def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
   /**
    * Sends the data to a single topic, partitioned by key, using either the
    * synchronous or the asynchronous producer

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala Wed Feb  8 02:59:54 2012
@@ -18,14 +18,16 @@ package kafka.javaapi.producer
 
 import scala.collection.JavaConversions._
 
-class ProducerData[K, V](private val topic: String,
-                         private val key: K,
-                         private val data: java.util.List[V]) {
+case class ProducerData[K, V](topic: String,
+                              key: K,
+                              data: java.util.List[V]) {
 
   def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
 
   def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
 
+  def this(t: String, k: K, d: V) = this(topic = t, key = k, data = asList(List(d)))
+
   def getTopic: String = topic
 
   def getKey: K = key

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Wed Feb  8 02:59:54 2012
@@ -16,7 +16,7 @@
 */
 package kafka.producer
 
-import collection.immutable.Map
+import collection.Map
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Wed Feb  8 02:59:54 2012
@@ -17,10 +17,11 @@
 package kafka.producer
 
 import collection.mutable.HashMap
-import collection.immutable.Map
+import collection.Map
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 import kafka.common.InvalidConfigException
+import kafka.api.ProducerRequest
 
 private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo {
   private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo
@@ -66,13 +67,11 @@ private[producer] class ConfigBrokerPart
       val brokerInfo = bInfo.split(":")
       if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value")
     }
-    val brokerPartitions = brokerInfoList.map(bInfo => (bInfo.split(":").head.toInt, 1))
+    val brokerIds = brokerInfoList.map(bInfo => bInfo.split(":").head.toInt)
     var brokerParts = SortedSet.empty[Partition]
-    brokerPartitions.foreach { bp =>
-      for(i <- 0 until bp._2) {
-        val bidPid = new Partition(bp._1, i)
-        brokerParts = brokerParts + bidPid
-      }
+    brokerIds.foreach { bid =>
+        val bidPid = new Partition(bid, ProducerRequest.RandomPartition)
+        brokerParts += bidPid
     }
     brokerParts
   }
@@ -90,7 +89,7 @@ private[producer] class ConfigBrokerPart
       brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
         brokerIdHostPort(1), brokerIdHostPort(2).toInt))
     }
-    brokerInfo.toMap
+    brokerInfo
   }
 
-}
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Wed Feb  8 02:59:54 2012
@@ -16,82 +16,51 @@
  */
 package kafka.producer
 
-import async.{CallbackHandler, EventHandler}
-import kafka.serializer.Encoder
+import async._
 import kafka.utils._
-import java.util.Properties
-import kafka.cluster.{Partition, Broker}
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
-import kafka.api.ProducerRequest
+import kafka.common.InvalidConfigException
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
+import kafka.serializer.Encoder
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
 
 class Producer[K,V](config: ProducerConfig,
-                    partitioner: Partitioner[K],
-                    producerPool: ProducerPool[V],
-                    populateProducerPool: Boolean,
-                    private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
-                                                          /* use the other constructor*/
+                    private val eventHandler: EventHandler[K,V]) // for testing only
 extends Logging {
   private val hasShutdown = new AtomicBoolean(false)
   if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
   if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
-    warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
-  private val random = new java.util.Random
-  // check if zookeeper based auto partition discovery is enabled
-  private val zkEnabled = Utils.propertyExists(config.zkConnect)
-  if(brokerPartitionInfo == null) {
-    zkEnabled match {
-      case true =>
-        val zkProps = new Properties()
-        zkProps.put("zk.connect", config.zkConnect)
-        zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
-        zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
-        zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
-        brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
-      case false =>
-        brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
-    }
-  }
-  // pool of producers, one per broker
-  if(populateProducerPool) {
-    val allBrokers = brokerPartitionInfo.getAllBrokerInfo
-    allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
+    throw new InvalidConfigException("Only one of zk.connect and broker.list should be provided")
+  if (config.batchSize > config.queueSize)
+    throw new InvalidConfigException("Batch size can't be larger than queue size.")
+
+  private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
+  private var sync: Boolean = true
+  private var producerSendThread: ProducerSendThread[K,V] = null
+  config.producerType match {
+    case "sync" =>
+    case "async" =>
+      sync = false
+      val asyncProducerID = Utils.getNextRandomInt
+      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
+        eventHandler, config.queueTime, config.batchSize)
+      producerSendThread.start
+    case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
-/**
- * This constructor can be used when all config parameters will be specified through the
- * ProducerConfig object
- * @param config Producer Configuration object
- */
-  def this(config: ProducerConfig) =  this(config, Utils.getObject(config.partitionerClass),
-    new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null)
-
   /**
-   * This constructor can be used to provide pre-instantiated objects for all config parameters
-   * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
-   * callback handler. If you use this constructor, encoder, eventHandler, callback handler and partitioner
-   * will not be picked up from the config.
+   * This constructor can be used when all config parameters will be specified through the
+   * ProducerConfig object
    * @param config Producer Configuration object
-   * @param encoder Encoder used to convert an object of type V to a kafka.message.Message. If this is null it
-   * throws an InvalidConfigException
-   * @param eventHandler the class that implements kafka.producer.async.IEventHandler[T] used to
-   * dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer. If this is null, it
-   * uses the DefaultEventHandler
-   * @param cbkHandler the class that implements kafka.producer.async.CallbackHandler[T] used to inject
-   * callbacks at various stages of the kafka.producer.AsyncProducer pipeline. If this is null, the producer does
-   * not use the callback handler and hence does not invoke any callbacks
-   * @param partitioner class that implements the kafka.producer.Partitioner[K], used to supply a custom
-   * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
-   * object in the  send API. If this is null, producer uses DefaultPartitioner
    */
-  def this(config: ProducerConfig,
-           encoder: Encoder[V],
-           eventHandler: EventHandler[V],
-           cbkHandler: CallbackHandler[V],
-           partitioner: Partitioner[K]) =
-    this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
-         new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
+  def this(config: ProducerConfig) =
+    this(config,
+         new DefaultEventHandler[K,V](config,
+                                      Utils.getObject[Partitioner[K]](config.partitionerClass),
+                                      Utils.getObject[Encoder[V]](config.serializerClass),
+                                      new ProducerPool(config),
+                                      populateProducerPool= true,
+                                      brokerPartitionInfo= null))
 
   /**
    * Sends the data, partitioned by key to the topic using either the
@@ -99,108 +68,49 @@ extends Logging {
    * @param producerData the producer data object that encapsulates the topic, key and message data
    */
   def send(producerData: ProducerData[K,V]*) {
-    zkEnabled match {
-      case true => zkSend(producerData: _*)
-      case false => configSend(producerData: _*)
+    if (hasShutdown.get)
+      throw new ProducerClosedException
+    recordStats(producerData: _*)
+    sync match {
+      case true => eventHandler.handle(producerData)
+      case false => asyncSend(producerData: _*)
     }
   }
 
-  private def zkSend(producerData: ProducerData[K,V]*) {
-    val producerPoolRequests = producerData.map { pd =>
-      var brokerIdPartition: Option[Partition] = None
-      var brokerInfoOpt: Option[Broker] = None
-
-      var numRetries: Int = 0
-      while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) {
-        if(numRetries > 0) {
-          info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
-          brokerPartitionInfo.updateInfo
-        }
-
-        val topicPartitionsList = getPartitionListForTopic(pd)
-        val totalNumPartitions = topicPartitionsList.length
-
-        val partitionId = getPartition(pd.getKey, totalNumPartitions)
-        brokerIdPartition = Some(topicPartitionsList(partitionId))
-        brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
-        numRetries += 1
+  private def recordStats(producerData: ProducerData[K,V]*) {
+    for (data <- producerData)
+      ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size)
+  }
+
+  private def asyncSend(producerData: ProducerData[K,V]*) {
+    for (data <- producerData) {
+      val added = config.enqueueTimeoutMs match {
+        case 0  =>
+          queue.offer(data)
+        case _  =>
+          try {
+            config.enqueueTimeoutMs < 0 match {
+            case true =>
+              queue.put(data)
+              true
+            case _ =>
+              queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+            }
+          }
+          catch {
+            case e: InterruptedException =>
+              false
+          }
       }
-
-      brokerInfoOpt match {
-        case Some(brokerInfo) =>
-          debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
-                  " on partition " + brokerIdPartition.get.partId)
-        case None =>
-          throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " +
-            pd.getTopic + " and key: " + pd.getKey)
+      if(!added) {
+        AsyncProducerStats.recordDroppedEvents
+        error("Event queue is full of unsent messages, could not send event: " + data.toString)
+        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
+      }else {
+        trace("Added to send queue an event: " + data.toString)
+        trace("Remaining queue size: " + queue.remainingCapacity)
       }
-      producerPool.getProducerPoolData(pd.getTopic,
-        new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId),
-        pd.getData)
     }
-    producerPool.send(producerPoolRequests: _*)
-  }
-
-  private def configSend(producerData: ProducerData[K,V]*) {
-    val producerPoolRequests = producerData.map { pd =>
-    // find the broker partitions registered for this topic
-      val topicPartitionsList = getPartitionListForTopic(pd)
-      val totalNumPartitions = topicPartitionsList.length
-
-      val randomBrokerId = random.nextInt(totalNumPartitions)
-      val brokerIdPartition = topicPartitionsList(randomBrokerId)
-      val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
-
-      debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
-                " on a randomly chosen partition")
-      val partition = ProducerRequest.RandomPartition
-      debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
-          brokerIdPartition.partId)
-      producerPool.getProducerPoolData(pd.getTopic,
-        new Partition(brokerIdPartition.brokerId, partition),
-        pd.getData)
-    }
-    producerPool.send(producerPoolRequests: _*)
-  }
-
-  private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
-    debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
-    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
-    debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
-    val totalNumPartitions = topicPartitionsList.length
-    if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
-    topicPartitionsList
-  }
-
-  /**
-   * Retrieves the partition id and throws an InvalidPartitionException if
-   * the value of partition is not between 0 and numPartitions-1
-   * @param key the partition key
-   * @param numPartitions the total number of available partitions
-   * @returns the partition id
-   */
-  private def getPartition(key: K, numPartitions: Int): Int = {
-    if(numPartitions <= 0)
-      throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
-              "\n Valid values are > 0")
-    val partition = if(key == null) random.nextInt(numPartitions)
-                    else partitioner.partition(key , numPartitions)
-    if(partition < 0 || partition >= numPartitions)
-      throw new InvalidPartitionException("Invalid partition id : " + partition +
-              "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
-    partition
-  }
-  
-  /**
-   * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
-   * on registration of new broker in zookeeper
-   * @param bid the id of the broker
-   * @param host the hostname of the broker
-   * @param port the port of the broker
-   */
-  private def producerCbk(bid: Int, host: String, port: Int) =  {
-    if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
-    else debug("Skipping the callback since populateProducerPool = false")
   }
 
   /**
@@ -210,8 +120,38 @@ extends Logging {
   def close() = {
     val canShutdown = hasShutdown.compareAndSet(false, true)
     if(canShutdown) {
-      producerPool.close
-      brokerPartitionInfo.close
+      if (producerSendThread != null)
+        producerSendThread.shutdown
+      eventHandler.close
+    }
+  }
+}
+
+trait ProducerTopicStatMBean {
+  def getMessagesPerTopic: Long
+}
+
+@threadsafe
+class ProducerTopicStat extends ProducerTopicStatMBean {
+  private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+
+  def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
+
+  def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+}
+
+object ProducerTopicStat extends Logging {
+  private val stats = new Pool[String, ProducerTopicStat]
+
+  def getProducerTopicStat(topic: String): ProducerTopicStat = {
+    var stat = stats.get(topic)
+    if (stat == null) {
+      stat = new ProducerTopicStat
+      if (stats.putIfNotExists(topic, stat) == null)
+        Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic)
+      else
+        stat = stats.get(topic)
     }
+    return stat
   }
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala?rev=1241754&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerClosedException.scala Wed Feb  8 02:59:54 2012
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+class ProducerClosedException() extends RuntimeException("producer already closed") {
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Feb  8 02:59:54 2012
@@ -17,13 +17,13 @@
 
 package kafka.producer
 
-import async.AsyncProducerConfigShared
+import async.AsyncProducerConfig
 import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.common.InvalidConfigException
 
 class ProducerConfig(val props: Properties) extends ZKConfig(props)
-        with AsyncProducerConfigShared with SyncProducerConfigShared{
+        with AsyncProducerConfig with SyncProducerConfigShared{
 
   /** For bypassing zookeeper based auto partition discovery, use this config   *
    *  to pass in static broker and per-broker partition information. Format-    *
@@ -37,7 +37,7 @@ class ProducerConfig(val props: Properti
     throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
 
   /** the partitioner class for partitioning events amongst sub-topics */
-  val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
+  val partitionerClass = Utils.getString(props, "partitioner.class", null)
 
   /** this parameter specifies whether the messages are sent asynchronously *
    * or not. Valid values are - async for asynchronous send                 *
@@ -71,5 +71,7 @@ class ProducerConfig(val props: Properti
    * ZK cache needs to be updated.
    * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
    */
-  val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3)
+  val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
+
+  val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 5)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala Wed Feb  8 02:59:54 2012
@@ -23,14 +23,16 @@ package kafka.producer
  * @param key the key used by the partitioner to pick a broker partition
  * @param data variable length data to be published as Kafka messages under topic
  */
-class ProducerData[K, V](private val topic: String,
-                         private val key: K,
-                         private val data: Seq[V]) {
+case class ProducerData[K,V](topic: String,
+                             key: K,
+                             data: Seq[V]) {
 
   def this(t: String, d: Seq[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
 
   def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d))
 
+  def this(t: String, k: K, d: V) = this(topic = t, key = k, data = List(d))
+
   def getTopic: String = topic
 
   def getKey: K = key

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Wed Feb  8 02:59:54 2012
@@ -17,163 +17,42 @@
 
 package kafka.producer
 
-import async._
 import java.util.Properties
-import kafka.serializer.Encoder
-import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
-import kafka.cluster.{Partition, Broker}
-import kafka.api.ProducerRequest
-import kafka.common.{UnavailableProducerException, InvalidConfigException}
-import kafka.utils.{Utils, Logging}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
-
-class ProducerPool[V](private val config: ProducerConfig,
-                      private val serializer: Encoder[V],
-                      private val syncProducers: ConcurrentMap[Int, SyncProducer],
-                      private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]],
-                      private val inputEventHandler: EventHandler[V] = null,
-                      private val cbkHandler: CallbackHandler[V] = null) extends Logging {
-
-  private var eventHandler = inputEventHandler
-  if(eventHandler == null)
-    eventHandler = new DefaultEventHandler(config, cbkHandler)
-
-  if(serializer == null)
-    throw new InvalidConfigException("serializer passed in is null!")
-
-  private var sync: Boolean = true
-  config.producerType match {
-    case "sync" =>
-    case "async" => sync = false
-    case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
-  }
+import kafka.cluster.Broker
+import kafka.utils.Logging
+import java.util.concurrent.ConcurrentHashMap
+
+class ProducerPool(private val config: ProducerConfig) extends Logging {
+  private val syncProducers = new ConcurrentHashMap[Int, SyncProducer]
 
-  def this(config: ProducerConfig, serializer: Encoder[V],
-           eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V]) =
-    this(config, serializer,
-         new ConcurrentHashMap[Int, SyncProducer](),
-         new ConcurrentHashMap[Int, AsyncProducer[V]](),
-         eventHandler, cbkHandler)
-
-  def this(config: ProducerConfig, serializer: Encoder[V]) = this(config, serializer,
-                                                                  new ConcurrentHashMap[Int, SyncProducer](),
-                                                                  new ConcurrentHashMap[Int, AsyncProducer[V]](),
-                                                                  Utils.getObject(config.eventHandler),
-                                                                  Utils.getObject(config.cbkHandler))
-  /**
-   * add a new producer, either synchronous or asynchronous, connecting
-   * to the specified broker 
-   * @param bid the id of the broker
-   * @param host the hostname of the broker
-   * @param port the port of the broker
-   */
   def addProducer(broker: Broker) {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
     props.putAll(config.props)
-    if(sync) {
-        val producer = new SyncProducer(new SyncProducerConfig(props))
-        info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
-        syncProducers.put(broker.id, producer)
-    } else {
-        val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
-                                            new SyncProducer(new SyncProducerConfig(props)),
-                                            serializer,
-                                            eventHandler, config.eventHandlerProps,
-                                            cbkHandler, config.cbkHandlerProps)
-        producer.start
-        info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
-        asyncProducers.put(broker.id, producer)
-    }
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+    syncProducers.put(broker.id, producer)
   }
 
   /**
-   * selects either a synchronous or an asynchronous producer, for
-   * the specified broker id and calls the send API on the selected
-   * producer to publish the data to the specified broker partition
-   * @param poolData the producer pool request object
+   *  For testing purpose
    */
-  def send(poolData: ProducerPoolData[V]*) {
-    val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct
-    var remainingRequests = poolData.toSeq
-    distinctBrokers.foreach { bid =>
-      val requestsForThisBid = remainingRequests partition (_.getBidPid.brokerId == bid)
-      remainingRequests = requestsForThisBid._2
-
-      if(sync) {
-        val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
-          new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
-                                   messages = req.getData.map(d => serializer.toMessage(d)): _*)))
-        debug("Fetching sync producer for broker id: " + bid)
-        val producer = syncProducers.get(bid)
-        if(producer != null) {
-          if(producerRequests.size > 1)
-            producer.multiSend(producerRequests.toArray)
-          else
-            producer.send(topic = producerRequests(0).topic,
-                          partition = producerRequests(0).partition,
-                          messages = producerRequests(0).messages)
-          config.compressionCodec match {
-            case NoCompressionCodec => debug("Sending message to broker " + bid)
-            case _ => debug("Sending compressed messages to broker " + bid)
-          }
-        }else
-          throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
-            "Sync Producer for broker " + bid + " does not exist in the pool")
-      }else {
-        debug("Fetching async producer for broker id: " + bid)
-        val producer = asyncProducers.get(bid)
-        if(producer != null) {
-          requestsForThisBid._1.foreach { req =>
-            req.getData.foreach(d => producer.send(req.getTopic, d, req.getBidPid.partId))
-          }
-          if(logger.isDebugEnabled)
-            config.compressionCodec match {
-              case NoCompressionCodec => debug("Sending message")
-              case _ => debug("Sending compressed messages")
-            }
-        }
-        else
-          throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
-            "Async Producer for broker " + bid + " does not exist in the pool")
-      }
-    }
+  def addProducer(brokerId: Int, syncProducer: SyncProducer) {
+    syncProducers.put(brokerId, syncProducer)
   }
 
-  /**
-   * Closes all the producers in the pool
-   */
-  def close() = {
-    config.producerType match {
-      case "sync" =>
-        info("Closing all sync producers")
-        val iter = syncProducers.values.iterator
-        while(iter.hasNext)
-          iter.next.close
-      case "async" =>
-        info("Closing all async producers")
-        val iter = asyncProducers.values.iterator
-        while(iter.hasNext)
-          iter.next.close
-    }
+  def getProducer(brokerId: Int) : SyncProducer = {
+    syncProducers.get(brokerId)
   }
 
   /**
-   * This constructs and returns the request object for the producer pool
-   * @param topic the topic to which the data should be published
-   * @param bidPid the broker id and partition id
-   * @param data the data to be published
+   * Closes all the producers in the pool
    */
-  def getProducerPoolData(topic: String, bidPid: Partition, data: Seq[V]): ProducerPoolData[V] = {
-    new ProducerPoolData[V](topic, bidPid, data)
-  }
-
-  class ProducerPoolData[V](topic: String,
-                            bidPid: Partition,
-                            data: Seq[V]) {
-    def getTopic: String = topic
-    def getBidPid: Partition = bidPid
-    def getData: Seq[V] = data
+  def close() = {
+    info("Closing all sync producers")
+    val iter = syncProducers.values.iterator
+    while(iter.hasNext)
+      iter.next.close
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Wed Feb  8 02:59:54 2012
@@ -18,13 +18,8 @@ package kafka.producer.async
 
 import java.util.Properties
 import kafka.utils.Utils
-import kafka.producer.SyncProducerConfig
 
-class AsyncProducerConfig(override val props: Properties) extends SyncProducerConfig(props)
-        with AsyncProducerConfigShared {
-}
-
-trait AsyncProducerConfigShared {
+trait AsyncProducerConfig {
   val props: Properties
 
   /* maximum time, in milliseconds, for buffering data on the producer queue */
@@ -46,16 +41,4 @@ trait AsyncProducerConfigShared {
 
   /** the serializer class for events */
   val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder")
-
-  /** the callback handler for one or multiple events */
-  val cbkHandler = Utils.getString(props, "callback.handler", null)
-
-  /** properties required to initialize the callback handler */
-  val cbkHandlerProps = Utils.getProps(props, "callback.handler.props", null)
-
-  /** the handler for events */
-  val eventHandler = Utils.getString(props, "event.handler", null)
-
-  /** properties required to initialize the callback handler */
-  val eventHandlerProps = Utils.getProps(props, "event.handler.props", null)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala Wed Feb  8 02:59:54 2012
@@ -18,33 +18,21 @@
 package kafka.producer.async
 
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.BlockingQueue
-import org.apache.log4j.Logger
 import kafka.utils.Utils
 
 class AsyncProducerStats extends AsyncProducerStatsMBean {
   val droppedEvents = new AtomicInteger(0)
-  val numEvents = new AtomicInteger(0)
-
-  def getAsyncProducerEvents: Int = numEvents.get
 
   def getAsyncProducerDroppedEvents: Int = droppedEvents.get
 
   def recordDroppedEvents = droppedEvents.getAndAdd(1)
-
-  def recordEvent = numEvents.getAndAdd(1)
-}
-
-class AsyncProducerQueueSizeStats[T](private val queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerQueueSizeStatsMBean {
-  def getAsyncProducerQueueSize: Int = queue.size
 }
 
 object AsyncProducerStats {
-  private val logger = Logger.getLogger(getClass())
   private val stats = new AsyncProducerStats
-  Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName)
+  val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
 
-  def recordDroppedEvents = stats.recordDroppedEvents
+  Utils.registerMBean(stats, ProducerMBeanName)
 
-  def recordEvent = stats.recordEvent
+  def recordDroppedEvents = stats.recordDroppedEvents
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala Wed Feb  8 02:59:54 2012
@@ -18,7 +18,6 @@
 package kafka.producer.async
 
 trait AsyncProducerStatsMBean {
-  def getAsyncProducerEvents: Int
   def getAsyncProducerDroppedEvents: Int
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Feb  8 02:59:54 2012
@@ -17,45 +17,175 @@
 
 package kafka.producer.async
 
-import collection.mutable.HashMap
-import collection.mutable.Map
 import kafka.api.ProducerRequest
 import kafka.serializer.Encoder
 import java.util.Properties
-import kafka.utils.Logging
-import kafka.producer.{ProducerConfig, SyncProducer}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+import kafka.producer._
+import kafka.utils.{ZKConfig, Utils, Logging}
+import kafka.cluster.{Partition, Broker}
+import collection.mutable.{ListBuffer, HashMap}
+import scala.collection.Map
+import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException}
+import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
+
+class DefaultEventHandler[K,V](config: ProducerConfig,                               // this api is for testing
+                               private val partitioner: Partitioner[K],              // use the other constructor
+                               private val encoder: Encoder[V],
+                               private val producerPool: ProducerPool,
+                               private val populateProducerPool: Boolean,
+                               private var brokerPartitionInfo: BrokerPartitionInfo)
+  extends EventHandler[K,V] with Logging {
+
+  private val lock = new Object()
+  private val zkEnabled = Utils.propertyExists(config.zkConnect)
+  if(brokerPartitionInfo == null) {
+    zkEnabled match {
+      case true =>
+        val zkProps = new Properties()
+        zkProps.put("zk.connect", config.zkConnect)
+        zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
+        zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
+        zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
+        brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
+      case false =>
+        brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
+    }
+  }
+
+  // pool of producers, one per broker
+  if(populateProducerPool) {
+    val allBrokers = brokerPartitionInfo.getAllBrokerInfo
+    allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port)))
+  }
+
+  /**
+   * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
+   * on registration of new broker in zookeeper
+   * @param bid the id of the broker
+   * @param host the hostname of the broker
+   * @param port the port of the broker
+   */
+  private def producerCbk(bid: Int, host: String, port: Int) =  {
+    if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
+    else debug("Skipping the callback since populateProducerPool = false")
+  }
 
+  def handle(events: Seq[ProducerData[K,V]]) {
+    lock synchronized {
+     val serializedData = serialize(events)
+     handleSerializedData(serializedData, config.producerRetries)
+    }
+  }
+
+  private def handleSerializedData(messages: Seq[ProducerData[K,Message]], requiredRetries: Int) {
+      val partitionedData = partitionAndCollate(messages)
+      for ( (brokerid, eventsPerBrokerMap) <- partitionedData) {
+        if (logger.isTraceEnabled)
+          eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partition: %d"
+            .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+        val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+
+        try {
+          send(brokerid, messageSetPerBroker)
+        }
+        catch {
+          case t =>
+            warn("error sending data to broker " + brokerid, t)
+            var numRetries = 0
+            val eventsPerBroker = new ListBuffer[ProducerData[K,Message]]
+            eventsPerBrokerMap.foreach(e => eventsPerBroker.appendAll(e._2))
+            while (numRetries < requiredRetries) {
+              numRetries +=1
+              Thread.sleep(config.producerRetryBackoffMs)
+              try {
+                brokerPartitionInfo.updateInfo
+                handleSerializedData(eventsPerBroker, 0)
+                return
+              }
+              catch {
+                case t => warn("error sending data to broker " + brokerid + " in " + numRetries + " retry", t)
+              }
+            }
+            throw new FailedToSendMessageException("can't send data after " + numRetries + " retries", t)
+        }
+      }
+  }
 
-private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
-                                            val cbkHandler: CallbackHandler[T]) extends EventHandler[T] with Logging {
+  def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
+    events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
+  }
 
-  override def init(props: Properties) { }
+  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = {
+    val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
+    for (event <- events) {
+      val topicPartitionsList = getPartitionListForTopic(event)
+      val totalNumPartitions = topicPartitionsList.length
+
+      val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+      val brokerPartition = topicPartitionsList(partitionIndex)
+
+      var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+      ret.get(brokerPartition.brokerId) match {
+        case Some(element) =>
+          dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+        case None =>
+          dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+          ret.put(brokerPartition.brokerId, dataPerBroker)
+      }
 
-  override def handle(events: Seq[QueueItem[T]], syncProducer: SyncProducer, serializer: Encoder[T]) {
-    var processedEvents = events
-    if(cbkHandler != null)
-      processedEvents = cbkHandler.beforeSendingData(events)
+      val topicAndPartition = (event.getTopic, brokerPartition.partId)
+      var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+      dataPerBroker.get(topicAndPartition) match {
+        case Some(element) =>
+          dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+        case None =>
+          dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+          dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+      }
+      dataPerTopicPartition.append(event)
+    }
+    ret
+  }
 
-    if(logger.isTraceEnabled)
-      processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d"
-        .format(event.getTopic, event.getPartition)))
+  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
+    debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
+    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+    debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
+    val totalNumPartitions = topicPartitionsList.length
+    if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+    topicPartitionsList
+  }
 
-    send(serialize(collate(processedEvents), serializer), syncProducer)
+  /**
+   * Retrieves the partition id and throws an InvalidPartitionException if
+   * the value of partition is not between 0 and numPartitions-1
+   * @param key the partition key
+   * @param numPartitions the total number of available partitions
+   * @returns the partition id
+   */
+  private def getPartition(key: K, numPartitions: Int): Int = {
+    if(numPartitions <= 0)
+      throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
+              "\n Valid values are > 0")
+    val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
+                    else partitioner.partition(key , numPartitions)
+    if(partition < 0 || partition >= numPartitions)
+      throw new InvalidPartitionException("Invalid partition id : " + partition +
+              "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+    partition
   }
 
-  private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
+  private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) {
     if(messagesPerTopic.size > 0) {
       val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
+      val syncProducer = producerPool.getProducer(brokerId)
       syncProducer.multiSend(requests)
       trace("kafka producer sent messages for topics %s to broker %s:%d"
         .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
     }
   }
 
-  private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
-                        serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
-    val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
+  private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = {
     /** enforce the compressed.topics config here.
      *  If the compression codec is anything other than NoCompressionCodec,
      *    Enable compression only for specified topics if any
@@ -63,55 +193,49 @@ private[kafka] class DefaultEventHandler
      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
      */
 
-    val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents =>
-      ((topicAndEvents._1._1, topicAndEvents._1._2),
-        config.compressionCodec match {
-          case NoCompressionCodec =>
-            trace("Sending %d messages with no compression to topic %s on partition %d"
-                .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2))
-            new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
-          case _ =>
-            config.compressedTopics.size match {
-              case 0 =>
-                trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                    .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
-                new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
-              case _ =>
-                if(config.compressedTopics.contains(topicAndEvents._1._1)) {
+    val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
+      {
+        val topicAndPartition = e._1
+        val produceData = e._2
+        val messages = new ListBuffer[Message]
+        produceData.map(p => messages.appendAll(p.getData))
+
+        ( topicAndPartition,
+          config.compressionCodec match {
+            case NoCompressionCodec =>
+              trace("Sending %d messages with no compression to topic %s on partition %d"
+                  .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+              new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+            case _ =>
+              config.compressedTopics.size match {
+                case 0 =>
                   trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                      .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
-                  new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
-                }
-                else {
-                  trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
-                      .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, topicAndEvents._1._1,
-                      config.compressedTopics.toString))
-                  new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
-                }
-            }
-        })
-    }
-    messagesPerTopicPartition
-  }
-
-  private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
-    val collatedEvents = new HashMap[(String, Int), Seq[T]]
-    val distinctTopics = events.map(e => e.getTopic).toSeq.distinct
-    val distinctPartitions = events.map(e => e.getPartition).distinct
-
-    var remainingEvents = events
-    distinctTopics foreach { topic =>
-      val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic))
-      remainingEvents = topicEvents._2
-      distinctPartitions.foreach { p =>
-        val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1
-        if(topicPartitionEvents.size > 0)
-          collatedEvents += ((topic, p) -> topicPartitionEvents.map(q => q.getData))
+                      .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                  new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+                case _ =>
+                  if(config.compressedTopics.contains(topicAndPartition._1)) {
+                    trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+                        .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                    new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+                  }
+                  else {
+                    trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
+                        .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
+                        config.compressedTopics.toString))
+                    new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+                  }
+              }
+          }
+        )
       }
     }
-    collatedEvents
+    messagesPerTopicPartition
   }
 
-  override def close = {
+  def close() {
+    if (producerPool != null)
+      producerPool.close    
+    if (brokerPartitionInfo != null)
+      brokerPartitionInfo.close
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala Wed Feb  8 02:59:54 2012
@@ -16,30 +16,21 @@
 */
 package kafka.producer.async
 
-import java.util.Properties
-import kafka.producer.SyncProducer
-import kafka.serializer.Encoder
+import kafka.producer.ProducerData
 
 /**
- * Handler that dispatches the batched data from the queue of the
- * asynchronous producer.
+ * Handler that dispatches the batched data from the queue.
  */
-trait EventHandler[T] {
-  /**
-   * Initializes the event handler using a Properties object
-   * @param props the properties used to initialize the event handler
-  */
-  def init(props: Properties) {}
+trait EventHandler[K,V] {
 
   /**
    * Callback to dispatch the batched data and send it to a Kafka server
    * @param events the data sent to the producer
-   * @param producer the low-level producer used to send the data
   */
-  def handle(events: Seq[QueueItem[T]], producer: SyncProducer, encoder: Encoder[T])
+  def handle(events: Seq[ProducerData[K,V]])
 
   /**
    * Cleans up and shuts down the event handler
   */
-  def close {}
+  def close
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Feb  8 02:59:54 2012
@@ -20,32 +20,21 @@ package kafka.producer.async
 import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
 import collection.mutable.ListBuffer
-import kafka.serializer.Encoder
-import kafka.producer.SyncProducer
+import kafka.producer.ProducerData
 
-private[async] class ProducerSendThread[T](val threadName: String,
-                                           val queue: BlockingQueue[QueueItem[T]],
-                                           val serializer: Encoder[T],
-                                           val underlyingProducer: SyncProducer,
-                                           val handler: EventHandler[T],
-                                           val cbkHandler: CallbackHandler[T],
-                                           val queueTime: Long,
-                                           val batchSize: Int,
-                                           val shutdownCommand: Any) extends Thread(threadName) with Logging {
+class ProducerSendThread[K,V](val threadName: String,
+                              val queue: BlockingQueue[ProducerData[K,V]],
+                              val handler: EventHandler[K,V],
+                              val queueTime: Long,
+                              val batchSize: Int) extends Thread(threadName) with Logging {
 
   private val shutdownLatch = new CountDownLatch(1)
+  private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
 
   override def run {
 
     try {
-      val remainingEvents = processEvents
-      debug("Remaining events = " + remainingEvents.size)
-
-      // handle remaining events
-      if(remainingEvents.size > 0) {
-        debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size))
-        tryToHandle(remainingEvents)
-      }
+      processEvents
     }catch {
       case e => error("Error in sending events: ", e)
     }finally {
@@ -53,34 +42,30 @@ private[async] class ProducerSendThread[
     }
   }
 
-  def awaitShutdown = shutdownLatch.await
-
   def shutdown = {
-    handler.close
-    info("Shutdown thread complete")
+    info("Beging shutting down ProducerSendThread")
+    queue.put(shutdownCommand)
+    shutdownLatch.await
+    info("Shutdown ProducerSendThread complete")
   }
 
-  private def processEvents(): Seq[QueueItem[T]] = {
+  private def processEvents() {
     var lastSend = SystemTime.milliseconds
-    var events = new ListBuffer[QueueItem[T]]
+    var events = new ListBuffer[ProducerData[K,V]]
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
     Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
-                      .takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach {
+                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
       currentQueueItem =>
         val elapsed = (SystemTime.milliseconds - lastSend)
         // check if the queue time is reached. This happens when the poll method above returns after a timeout and
         // returns a null object
         val expired = currentQueueItem == null
         if(currentQueueItem != null) {
-          trace("Dequeued item for topic %s and partition %d"
-              .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
-          // handle the dequeued current item
-          if(cbkHandler != null)
-            events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
-          else
-            events += currentQueueItem
+          trace("Dequeued item for topic %s, partition key: %s, data: %s"
+              .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
+          events += currentQueueItem
 
           // check if the batch size is reached
           full = events.size >= batchSize
@@ -91,36 +76,22 @@ private[async] class ProducerSendThread[
           // if either queue time has reached or batch size has reached, dispatch to event handler
           tryToHandle(events)
           lastSend = SystemTime.milliseconds
-          events = new ListBuffer[QueueItem[T]]
+          events = new ListBuffer[ProducerData[K,V]]
         }
     }
     if(queue.size > 0)
       throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
         .format(queue.size))
-    if(cbkHandler != null) {
-      info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
-      val addedEvents = cbkHandler.lastBatchBeforeClose
-      logEvents("last batch before close", addedEvents)
-      events = events ++ addedEvents
-    }
-    events
   }
 
-  def tryToHandle(events: Seq[QueueItem[T]]) {
+  def tryToHandle(events: Seq[ProducerData[K,V]]) {
     try {
       debug("Handling " + events.size + " events")
       if(events.size > 0)
-        handler.handle(events, underlyingProducer, serializer)
+        handler.handle(events)
     }catch {
-      case e: Exception => error("Error in handling batch of " + events.size + " events", e)
+      case e => error("Error in handling batch of " + events.size + " events", e)
     }
   }
 
-  private def logEvents(tag: String, events: Iterable[QueueItem[T]]) {
-    if(logger.isTraceEnabled) {
-      trace("events for " + tag + ":")
-      for (event <- events)
-        trace(event.getData.toString)
-    }
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Wed Feb  8 02:59:54 2012
@@ -20,9 +20,7 @@ package kafka.tools
 import joptsimple.OptionParser
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
-import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.DefaultEncoder
-import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
+import kafka.producer.{ProducerData, ProducerConfig, Producer}
 import kafka.consumer._
 import kafka.utils.{ZKStringSerializer, Logging}
 import kafka.api.OffsetRequest
@@ -171,9 +169,7 @@ object ReplayLogProducer extends Logging
       props.put("producer.type", "async")
 
     val producerConfig = new ProducerConfig(props)
-    val producer = new Producer[Message, Message](producerConfig, new DefaultEncoder,
-                                                  new DefaultEventHandler[Message](producerConfig, null),
-                                                  null, new DefaultPartitioner[Message])
+    val producer = new Producer[Message, Message](producerConfig)
 
     override def run() {
       info("Starting consumer thread..")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Feb  8 02:59:54 2012
@@ -24,17 +24,22 @@ import java.util.concurrent.atomic._
 import java.lang.management._
 import java.util.zip.CRC32
 import javax.management._
-import java.util.Properties
 import scala.collection._
 import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
+import java.util.{Random, Properties}
 
 /**
  * Helper functions!
  */
 object Utils extends Logging {
+  val random = new Random
+
+  def getNextRandomInt(): Int = random.nextInt
   
+  def getNextRandomInt(upper: Int): Int = random.nextInt(upper)
+
   /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Wed Feb  8 02:59:54 2012
@@ -66,7 +66,7 @@ class ZookeeperConsumerConnectorTest ext
     // also the iterator should support re-entrant, so loop it twice
     for (i <- 0 until  2) {
       try {
-        getMessages(nMessages*2, topicMessageStreams0)
+        getMessagesSortedByChecksum(nMessages*2, topicMessageStreams0)
         fail("should get an exception")
       }
       catch {
@@ -84,7 +84,7 @@ class ZookeeperConsumerConnectorTest ext
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
-    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
     // commit consumed offsets
     zkConsumerConnector1.commitOffsets
@@ -97,8 +97,8 @@ class ZookeeperConsumerConnectorTest ext
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2")
     Thread.sleep(200)
-    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
     val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sentMessages2, receivedMessages2)
 
@@ -111,8 +111,8 @@ class ZookeeperConsumerConnectorTest ext
     Thread.sleep(200)
     val sentMessages3 = sendMessages(nMessages, "batch3")
     Thread.sleep(200)
-    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
     val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sentMessages3, receivedMessages3)
 
@@ -135,7 +135,7 @@ class ZookeeperConsumerConnectorTest ext
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
-    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
     // commit consumed offsets
     zkConsumerConnector1.commitOffsets
@@ -149,8 +149,8 @@ class ZookeeperConsumerConnectorTest ext
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
     Thread.sleep(200)
-    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
     val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sentMessages2, receivedMessages2)
 
@@ -164,8 +164,8 @@ class ZookeeperConsumerConnectorTest ext
     Thread.sleep(200)
     val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
     Thread.sleep(200)
-    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
     val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sentMessages3, receivedMessages3)
 
@@ -195,13 +195,13 @@ class ZookeeperConsumerConnectorTest ext
     }
     val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
-    getMessages(100, topicMessageStreams0)
+    getMessagesSortedByChecksum(100, topicMessageStreams0)
     zkConsumerConnector0.shutdown
     // at this point, only some part of the message set was consumed. So consumed offset should still be 0
     // also fetched offset should be 0
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
-    val receivedMessages = getMessages(400, topicMessageStreams1)
+    val receivedMessages = getMessagesSortedByChecksum(400, topicMessageStreams1)
     val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
     val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sortedSentMessages, sortedReceivedMessages)
@@ -268,19 +268,8 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
-    var messages: List[Message] = Nil
-    for ((topic, messageStreams) <- topicMessageStreams) {
-      for (messageStream <- messageStreams) {
-        val iterator = messageStream.iterator
-        for (i <- 0 until nMessagesPerThread) {
-          assertTrue(iterator.hasNext)
-          val message = iterator.next
-          messages ::= message
-          debug("received message: " + Utils.toString(message.payload, "UTF-8"))
-        }
-      }
-    }
+  def getMessagesSortedByChecksum(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
+    val messages = TestUtils.getConsumedMessages(nMessagesPerThread, topicMessageStreams)
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Wed Feb  8 02:59:54 2012
@@ -52,7 +52,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
-    stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+    stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
     Thread.sleep(200)
 
     var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
@@ -73,7 +73,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
-    stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+    stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
     Thread.sleep(200)
 
     var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Wed Feb  8 02:59:54 2012
@@ -67,7 +67,7 @@ class ZookeeperConsumerConnectorTest ext
 
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
-    val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port))
+    val producer = new kafka.javaapi.producer.SyncProducer(TestUtils.createProducer("localhost", conf.port))
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x =>
         new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray