You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/11/15 23:15:26 UTC

svn commit: r1410055 [1/3] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/consum...

Author: jkreps
Date: Thu Nov 15 22:15:14 2012
New Revision: 1410055

URL: http://svn.apache.org/viewvc?rev=1410055&view=rev
Log:
KAFKA-544 Store the key given to the producer in the message. Expose this key in the consumer. Patch reviewed by Jun.


Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Thu Nov 15 22:15:14 2012
@@ -27,9 +27,9 @@ import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
 import kafka.etl.Props;
 import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
 import kafka.producer.ProducerConfig;
+import kafka.producer.KeyedMessage;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -81,18 +81,17 @@ public class DataGenerator {
 
 	public void run() throws Exception {
 
-		List<Message> list = new ArrayList<Message>();
+		List<KeyedMessage> list = new ArrayList<KeyedMessage>();
 		for (int i = 0; i < _count; i++) {
 			Long timestamp = RANDOM.nextLong();
 			if (timestamp < 0) timestamp = -timestamp;
 			byte[] bytes = timestamp.toString().getBytes("UTF8");
 			Message message = new Message(bytes);
-			list.add(message);
+			list.add(new KeyedMessage<Integer, Message>(_topic, null, message));
 		}
 		// send events
 		System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
-        ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
-		_producer.send(pd);
+		_producer.send(list);
 
 		// close the producer
 		_producer.close();

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Thu Nov 15 22:15:14 2012
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
+import kafka.producer.KeyedMessage;
 import kafka.message.Message;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -33,7 +33,7 @@ public class KafkaRecordWriter<W extends
   protected Producer<Integer, Message> producer;
   protected String topic;
 
-  protected List<ProducerData<Integer, Message>> msgList = new LinkedList<ProducerData<Integer, Message>>();
+  protected List<KeyedMessage<Integer, Message>> msgList = new LinkedList<KeyedMessage<Integer, Message>>();
   protected int totalSize = 0;
   protected int queueSize;
 
@@ -57,7 +57,7 @@ public class KafkaRecordWriter<W extends
   public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
   {
     Message msg = new Message(value.getBytes());
-    msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
+    msgList.add(new KeyedMessage<Integer, Message>(this.topic, msg));
     totalSize += msg.size();
 
     // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Nov 15 22:15:14 2012
@@ -24,7 +24,7 @@ import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
-import kafka.serializer.StringDecoder
+import kafka.serializer._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsReporter
 
@@ -179,7 +179,7 @@ object ConsoleConsumer extends Logging {
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
     try {
-      val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+      val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
       val iter = if(maxMessages >= 0)
         stream.slice(0, maxMessages)
       else
@@ -187,7 +187,7 @@ object ConsoleConsumer extends Logging {
 
       for(messageAndTopic <- iter) {
         try {
-          formatter.writeTo(messageAndTopic.message, System.out)
+          formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -251,36 +251,14 @@ object MessageFormatter {
 }
 
 trait MessageFormatter {
-  def writeTo(message: Message, output: PrintStream)
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
   def init(props: Properties) {}
   def close() {}
 }
 
-class DecodedMessageFormatter extends MessageFormatter {
-  var topicStr: String = _
-  val decoder = new StringDecoder()
-
-  override def init(props: Properties) {
-    topicStr = props.getProperty("topic")
-    if (topicStr != null)
-      topicStr = topicStr + ":"
-    else
-      topicStr = ""
-  }
-
-  def writeTo(message: Message, output: PrintStream) {
-    try {
-      output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
-    } catch {
-      case e => e.printStackTrace()
-    }
-  }
-}
-
 class NewlineMessageFormatter extends MessageFormatter {
-  def writeTo(message: Message, output: PrintStream) {
-    val payload = message.payload
-    output.write(payload.array, payload.arrayOffset, payload.limit)
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    output.write(value)
     output.write('\n')
   }
 }
@@ -296,8 +274,8 @@ class ChecksumMessageFormatter extends M
       topicStr = ""
   }
 
-  def writeTo(message: Message, output: PrintStream) {
-    val chksum = message.checksum
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    val chksum = new Message(value, key).checksum
     output.println(topicStr + "checksum:" + chksum)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Thu Nov 15 22:15:14 2012
@@ -19,38 +19,53 @@ package kafka.consumer
 
 import scala.collection._
 import kafka.utils.Logging
-import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.serializer._
 
 /**
  *  Main interface for consumer
  */
 trait ConsumerConnector {
+  
   /**
    *  Create a list of MessageStreams for each topic.
    *
    *  @param topicCountMap  a map of (topic, #streams) pair
-   *  @param decoder Decoder to decode each Message to type T
    *  @return a map of (topic, list of  KafkaStream) pairs.
    *          The number of items in the list is #streams. Each stream supports
    *          an iterator over message/metadata pairs.
    */
-  def createMessageStreams[T](topicCountMap: Map[String,Int],
-                              decoder: Decoder[T] = new DefaultDecoder)
-    : Map[String,List[KafkaStream[T]]]
-
+  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
+  
+  /**
+   *  Create a list of MessageStreams for each topic.
+   *
+   *  @param topicCountMap  a map of (topic, #streams) pair
+   *  @param keyDecoder Decoder to decode the key portion of the message
+   *  @param valueDecoder Decoder to decode the value portion of the message
+   *  @return a map of (topic, list of  KafkaStream) pairs.
+   *          The number of items in the list is #streams. Each stream supports
+   *          an iterator over message/metadata pairs.
+   */
+  def createMessageStreams[K,V](topicCountMap: Map[String,Int],
+                                keyDecoder: Decoder[K],
+                                valueDecoder: Decoder[V])
+    : Map[String,List[KafkaStream[K,V]]]
+  
   /**
    *  Create a list of message streams for all topics that match a given filter.
    *
    *  @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
    *  @param numStreams Number of streams to return
-   *  @param decoder Decoder to decode each Message to type T
+   *  @param keyDecoder Decoder to decode the key portion of the message
+   *  @param valueDecoder Decoder to decode the value portion of the message
    *  @return a list of KafkaStream each of which provides an
    *          iterator over message/metadata pairs over allowed topics.
    */
-  def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
-                                      numStreams: Int = 1,
-                                      decoder: Decoder[T] = new DefaultDecoder)
-    : Seq[KafkaStream[T]]
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
+                                        numStreams: Int = 1,
+                                        keyDecoder: Decoder[K] = new DefaultDecoder(),
+                                        valueDecoder: Decoder[V] = new DefaultDecoder())
+    : Seq[KafkaStream[K,V]]
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Nov 15 22:15:14 2012
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-import kafka.utils.{IteratorTemplate, Logging}
+import kafka.utils.{IteratorTemplate, Logging, Utils}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference
@@ -30,17 +30,18 @@ import kafka.common.{KafkaException, Mes
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  *
  */
-class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
-                          consumerTimeoutMs: Int,
-                          private val decoder: Decoder[T],
-                          val enableShallowIterator: Boolean)
-  extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
+class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
+                             consumerTimeoutMs: Int,
+                             private val keyDecoder: Decoder[K],
+                             private val valueDecoder: Decoder[V],
+                             val enableShallowIterator: Boolean)
+  extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
-  private var currentTopicInfo:PartitionTopicInfo = null
+  private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
-  override def next(): MessageAndMetadata[T] = {
+  override def next(): MessageAndMetadata[K, V] = {
     val item = super.next()
     if(consumedOffset < 0)
       throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
@@ -52,7 +53,7 @@ class ConsumerIterator[T](private val ch
     item
   }
 
-  protected def makeNext(): MessageAndMetadata[T] = {
+  protected def makeNext(): MessageAndMetadata[K, V] = {
     var currentDataChunk: FetchedDataChunk = null
     // if we don't have an iterator, get one
     var localCurrent = current.get()
@@ -103,7 +104,10 @@ class ConsumerIterator[T](private val ch
 
     item.message.ensureValid() // validate checksum of message to ensure it is valid
 
-    new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
+    val keyBuffer = item.message.key
+    val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
+    val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
+    new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
   }
 
   def clearCurrentChunk() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala Thu Nov 15 22:15:14 2012
@@ -22,19 +22,20 @@ import java.util.concurrent.BlockingQueu
 import kafka.serializer.Decoder
 import kafka.message.MessageAndMetadata
 
-class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
-                     consumerTimeoutMs: Int,
-                     private val decoder: Decoder[T],
-                     val enableShallowIterator: Boolean)
-   extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
+class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
+                        consumerTimeoutMs: Int,
+                        private val keyDecoder: Decoder[K],
+                        private val valueDecoder: Decoder[V],
+                        val enableShallowIterator: Boolean)
+   extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
-  private val iter: ConsumerIterator[T] =
-    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
+  private val iter: ConsumerIterator[K,V] =
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
 
   /**
    *  Create an iterator over messages in the stream.
    */
-  def iterator(): ConsumerIterator[T] = iter
+  def iterator(): ConsumerIterator[K,V] = iter
 
   /**
    * This method clears the queue being iterated during the consumer rebalancing. This is mainly

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Nov 15 22:15:14 2012
@@ -28,13 +28,14 @@ import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import java.util.UUID
-import kafka.serializer.Decoder
+import kafka.serializer._
 import kafka.utils.ZkUtils._
 import kafka.common._
 import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
 import kafka.api.OffsetRequest
 import kafka.metrics._
+import kafka.producer.ProducerConfig
 
 
 /**
@@ -120,17 +121,23 @@ private[kafka] class ZookeeperConsumerCo
   KafkaMetricsReporter.startReporters(config.props)
 
   def this(config: ConsumerConfig) = this(config, true)
+  
+  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = 
+    createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
 
-  def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
-      : Map[String,List[KafkaStream[T]]] = {
+  def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
+      : Map[String, List[KafkaStream[K,V]]] = {
     if (messageStreamCreated.getAndSet(true))
       throw new RuntimeException(this.getClass.getSimpleName +
                                    " can create message streams at most once")
-    consume(topicCountMap, decoder)
+    consume(topicCountMap, keyDecoder, valueDecoder)
   }
 
-  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
-    val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, 
+                                        numStreams: Int, 
+                                        keyDecoder: Decoder[K] = new DefaultDecoder(), 
+                                        valueDecoder: Decoder[V] = new DefaultDecoder()) = {
+    val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder)
     wildcardStreamsHandler.streams
   }
 
@@ -173,8 +180,8 @@ private[kafka] class ZookeeperConsumerCo
     }
   }
 
-  def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
-      : Map[String,List[KafkaStream[T]]] = {
+  def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
+      : Map[String,List[KafkaStream[K,V]]] = {
     debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
@@ -187,8 +194,8 @@ private[kafka] class ZookeeperConsumerCo
     val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
-        val stream = new KafkaStream[T](
-          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+        val stream = new KafkaStream[K,V](
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
         (queue, stream)
       })
     ).flatten.toList
@@ -197,7 +204,7 @@ private[kafka] class ZookeeperConsumerCo
     registerConsumerInZK(dirs, consumerIdString, topicCount)
     reinitializeConsumer(topicCount, queuesAndStreams)
 
-    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
+    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
   }
 
   // this API is used by unit tests only
@@ -293,7 +300,7 @@ private[kafka] class ZookeeperConsumerCo
   }
 
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
-                             val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
+                             val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
@@ -473,7 +480,7 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def closeFetchersForQueues(cluster: Cluster,
-                                       messageStreams: Map[String,List[KafkaStream[_]]],
+                                       messageStreams: Map[String,List[KafkaStream[_,_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
@@ -496,7 +503,7 @@ private[kafka] class ZookeeperConsumerCo
 
     private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
                                    queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
-                                   messageStreams: Map[String,List[KafkaStream[_]]]) {
+                                   messageStreams: Map[String,List[KafkaStream[_,_]]]) {
 
       // Clear all but the currently iterated upon chunk in the consumer thread's queue
       queuesTobeCleared.foreach(_.clear)
@@ -510,7 +517,7 @@ private[kafka] class ZookeeperConsumerCo
 
     }
 
-    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
+    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]],
                               relevantTopicThreadIdsMap: Map[String, Set[String]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
       // after this rebalancing attempt
@@ -610,17 +617,17 @@ private[kafka] class ZookeeperConsumerCo
     }
   }
 
-  private def reinitializeConsumer[T](
+  private def reinitializeConsumer[K,V](
       topicCount: TopicCount,
-      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
 
     val dirs = new ZKGroupDirs(config.groupId)
 
     // listener to consumer and partition changes
     if (loadBalancerListener == null) {
-      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
+      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
       loadBalancerListener = new ZKRebalancerListener(
-        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
+        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
     }
 
     // register listener for session expired event
@@ -690,9 +697,10 @@ private[kafka] class ZookeeperConsumerCo
     loadBalancerListener.syncedRebalance()
   }
 
-  class WildcardStreamsHandler[T](topicFilter: TopicFilter,
+  class WildcardStreamsHandler[K,V](topicFilter: TopicFilter,
                                   numStreams: Int,
-                                  decoder: Decoder[T])
+                                  keyDecoder: Decoder[K],
+                                  valueDecoder: Decoder[V])
                                 extends TopicEventHandler[String] {
 
     if (messageStreamCreated.getAndSet(true))
@@ -702,8 +710,11 @@ private[kafka] class ZookeeperConsumerCo
     private val wildcardQueuesAndStreams = (1 to numStreams)
       .map(e => {
         val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
-        val stream = new KafkaStream[T](
-          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+        val stream = new KafkaStream[K,V](queue, 
+                                          config.consumerTimeoutMs, 
+                                          keyDecoder, 
+                                          valueDecoder, 
+                                          config.enableShallowIterator)
         (queue, stream)
     }).toList
 
@@ -760,7 +771,7 @@ private[kafka] class ZookeeperConsumerCo
         reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
     }
 
-    def streams: Seq[KafkaStream[T]] =
+    def streams: Seq[KafkaStream[K,V]] =
       wildcardQueuesAndStreams.map(_._2)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Thu Nov 15 22:15:14 2012
@@ -20,7 +20,6 @@ package kafka.javaapi.consumer;
 
 import kafka.consumer.KafkaStream;
 import kafka.consumer.TopicFilter;
-import kafka.message.Message;
 import kafka.serializer.Decoder;
 
 import java.util.List;
@@ -36,10 +35,10 @@ public interface ConsumerConnector {
    *          The number of items in the list is #streams. Each stream supports
    *          an iterator over message/metadata pairs.
    */
-  public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
-      Map<String, Integer> topicCountMap, Decoder<T> decoder);
-  public Map<String, List<KafkaStream<Message>>> createMessageStreams(
-      Map<String, Integer> topicCountMap);
+  public <K,V> Map<String, List<KafkaStream<K,V>>> 
+    createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
+  
+  public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
 
   /**
    *  Create a list of MessageAndTopicStreams containing messages of type T.
@@ -47,16 +46,17 @@ public interface ConsumerConnector {
    *  @param topicFilter a TopicFilter that specifies which topics to
    *                    subscribe to (encapsulates a whitelist or a blacklist).
    *  @param numStreams the number of message streams to return.
-   *  @param decoder a decoder that converts from Message to T
+   *  @param keyDecoder a decoder that decodes the message key
+   *  @param valueDecoder a decoder that decodes the message itself
    *  @return a list of KafkaStream. Each stream supports an
    *          iterator over its MessageAndMetadata elements.
    */
-  public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
-      TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
-  public List<KafkaStream<Message>> createMessageStreamsByFilter(
-      TopicFilter topicFilter, int numStreams);
-  public List<KafkaStream<Message>> createMessageStreamsByFilter(
-      TopicFilter topicFilter);
+  public <K,V> List<KafkaStream<K,V>> 
+    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
+  
+  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
+  
+  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Thu Nov 15 22:15:14 2012
@@ -17,7 +17,7 @@
 package kafka.javaapi.consumer
 
 import kafka.message.Message
-import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.serializer._
 import kafka.consumer._
 import scala.collection.JavaConversions.asList
 
@@ -59,7 +59,7 @@ import scala.collection.JavaConversions.
 */
 
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
-                                 val enableFetcher: Boolean) // for testing only
+                                                val enableFetcher: Boolean) // for testing only
     extends ConsumerConnector {
 
   private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
@@ -67,38 +67,37 @@ private[kafka] class ZookeeperConsumerCo
   def this(config: ConsumerConfig) = this(config, true)
 
  // for java client
-  def createMessageStreams[T](
+  def createMessageStreams[K,V](
         topicCountMap: java.util.Map[String,java.lang.Integer],
-        decoder: Decoder[T])
-      : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
+        keyDecoder: Decoder[K],
+        valueDecoder: Decoder[V])
+      : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
     import scala.collection.JavaConversions._
 
     val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
-    val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
-    val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
+    val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
+    val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
+      var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)
     }
     ret
   }
-
-  def createMessageStreams(
-        topicCountMap: java.util.Map[String,java.lang.Integer])
-      : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
-    createMessageStreams(topicCountMap, new DefaultDecoder)
-
-  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
-    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
-
-  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
-    createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
-
-  def createMessageStreamsByFilter(topicFilter: TopicFilter) =
-    createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
-
+  
+  def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
+    createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
+    
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
+    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
+
+  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = 
+    createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
+    
+  def createMessageStreamsByFilter(topicFilter: TopicFilter) = 
+    createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
+    
   def commitOffsets() {
     underlying.commitOffsets
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala Thu Nov 15 22:15:14 2012
@@ -18,6 +18,7 @@
 package kafka.javaapi.producer
 
 import kafka.producer.ProducerConfig
+import kafka.producer.KeyedMessage
 
 class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
 {
@@ -27,20 +28,17 @@ class Producer[K,V](private val underlyi
    * synchronous or the asynchronous producer
    * @param producerData the producer data object that encapsulates the topic, key and message data
    */
-  def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
-    import collection.JavaConversions._
-    underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
-                                                         asBuffer(producerData.getData)))
+  def send(message: KeyedMessage[K,V]) {
+    underlying.send(message)
   }
 
   /**
    * Use this API to send data to multiple topics
    * @param producerData list of producer data objects that encapsulate the topic, key and message data
    */
-  def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
+  def send(messages: java.util.List[KeyedMessage[K,V]]) {
     import collection.JavaConversions._
-    underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
-                                                         asBuffer(pd.getData))): _*)
+    underlying.send(asBuffer(messages):_*)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala Thu Nov 15 22:15:14 2012
@@ -17,5 +17,5 @@
 
 package kafka.message
 
-case class MessageAndMetadata[T](message: T, topic: String = "")
+case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Thu Nov 15 22:15:14 2012
@@ -59,6 +59,14 @@ class BlockingChannel( val host: String,
       writeChannel = channel
       readChannel = Channels.newChannel(channel.socket().getInputStream)
       connected = true
+      // settings may not match what we requested above
+      val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
+      debug(msg.format(channel.socket.getSoTimeout,
+                       readTimeoutMs,
+                       channel.socket.getReceiveBufferSize, 
+                       readBufferSize,
+                       channel.socket.getSendBufferSize,
+                       writeBufferSize))
     }
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Thu Nov 15 22:15:14 2012
@@ -111,7 +111,7 @@ object ConsoleProducer { 
     do { 
       message = reader.readMessage()
       if(message != null)
-        producer.send(new ProducerData(topic, message))
+        producer.send(new KeyedMessage(topic, message))
     } while(message != null)
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala Thu Nov 15 22:15:14 2012
@@ -19,7 +19,9 @@ package kafka.producer
 
 import kafka.utils.Utils
 
-private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
+import kafka.utils._
+
+private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] {
   private val random = new java.util.Random
   
   def partition(key: T, numPartitions: Int): Int = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Nov 15 22:15:14 2012
@@ -87,8 +87,7 @@ class KafkaLog4jAppender extends Appende
     }
     else this.layout.format(event)
     LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
-    val messageData : ProducerData[String, String] =
-      new ProducerData[String, String](topic, message)
+    val messageData = new KeyedMessage[String, String](topic, message)
     producer.send(messageData);
   }
 

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala?rev=1410055&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala Thu Nov 15 22:15:14 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+/**
+ * A topic, key, and value
+ */
+case class KeyedMessage[K, V](val topic: String, val key: K, val message: V) {
+  if(topic == null)
+    throw new IllegalArgumentException("Topic cannot be null.")
+  
+  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], message)
+  
+  def hasKey = key != null
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala Thu Nov 15 22:15:14 2012
@@ -16,6 +16,13 @@
 */
 package kafka.producer
 
+/**
+ * A partitioner controls the mapping between user-provided keys and kafka partitions. Users can implement a custom
+ * partitioner to change this mapping.
+ * 
+ * Implementations will be constructed via reflection and are required to have a constructor that takes a single 
+ * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
+ */
 trait Partitioner[T] {
   /**
    * Uses the key to calculate a partition bucket id for routing

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Thu Nov 15 22:15:14 2012
@@ -33,7 +33,7 @@ extends Logging {
   if (config.batchSize > config.queueSize)
     throw new InvalidConfigException("Batch size can't be larger than queue size.")
 
-  private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
+  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
 
   private val random = new Random
   private var sync: Boolean = true
@@ -43,9 +43,12 @@ extends Logging {
     case "async" =>
       sync = false
       val asyncProducerID = random.nextInt(Int.MaxValue)
-      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
-        eventHandler, config.queueTime, config.batchSize)
-      producerSendThread.start
+      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, 
+                                                       queue,
+                                                       eventHandler, 
+                                                       config.queueTime, 
+                                                       config.batchSize)
+      producerSendThread.start()
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
@@ -54,8 +57,9 @@ extends Logging {
   def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,
-                                      Utils.createObject[Partitioner[K]](config.partitionerClass),
-                                      Utils.createObject[Encoder[V]](config.serializerClass),
+                                      Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+                                      Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+                                      Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
                                       new ProducerPool(config)))
 
   /**
@@ -63,36 +67,36 @@ extends Logging {
    * synchronous or the asynchronous producer
    * @param producerData the producer data object that encapsulates the topic, key and message data
    */
-  def send(producerData: ProducerData[K,V]*) {
+  def send(messages: KeyedMessage[K,V]*) {
     if (hasShutdown.get)
       throw new ProducerClosedException
-    recordStats(producerData: _*)
+    recordStats(messages)
     sync match {
-      case true => eventHandler.handle(producerData)
-      case false => asyncSend(producerData: _*)
+      case true => eventHandler.handle(messages)
+      case false => asyncSend(messages)
     }
   }
 
-  private def recordStats(producerData: ProducerData[K,V]*) {
-    for (data <- producerData) {
-      ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size)
-      ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size)
+  private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
+    for (message <- messages) {
+      ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
+      ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
     }
   }
 
-  private def asyncSend(producerData: ProducerData[K,V]*) {
-    for (data <- producerData) {
+  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
+    for (message <- messages) {
       val added = config.enqueueTimeoutMs match {
         case 0  =>
-          queue.offer(data)
+          queue.offer(message)
         case _  =>
           try {
             config.enqueueTimeoutMs < 0 match {
             case true =>
-              queue.put(data)
+              queue.put(message)
               true
             case _ =>
-              queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+              queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
             }
           }
           catch {
@@ -102,10 +106,10 @@ extends Logging {
       }
       if(!added) {
         AsyncProducerStats.droppedMessageRate.mark()
-        error("Event queue is full of unsent messages, could not send event: " + data.toString)
-        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
+        error("Event queue is full of unsent messages, could not send event: " + message.toString)
+        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
       }else {
-        trace("Added to send queue an event: " + data.toString)
+        trace("Added to send queue an event: " + message.toString)
         trace("Remaining queue size: " + queue.remainingCapacity)
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Thu Nov 15 22:15:14 2012
@@ -38,6 +38,10 @@ trait AsyncProducerConfig {
   /** the number of messages batched at the producer */
   val batchSize = props.getInt("batch.size", 200)
 
-  /** the serializer class for events */
+  /** the serializer class for values */
   val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
+  
+  /** the serializer class for keys (defaults to the same as for values) */
+  val keySerializerClass = props.getString("key.serializer.class", serializerClass)
+  
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Thu Nov 15 22:15:14 2012
@@ -23,7 +23,7 @@ import kafka.producer._
 import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
 import scala.collection.{Seq, Map}
-import scala.collection.mutable.{ListBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 import java.util.concurrent.atomic._
 import kafka.api.{TopicMetadata, ProducerRequest}
 
@@ -31,6 +31,7 @@ import kafka.api.{TopicMetadata, Produce
 class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val partitioner: Partitioner[K],
                                private val encoder: Encoder[V],
+                               private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
                                private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
   extends EventHandler[K,V] with Logging {
@@ -41,13 +42,14 @@ class DefaultEventHandler[K,V](config: P
 
   private val lock = new Object()
 
-  def handle(events: Seq[ProducerData[K,V]]) {
+  def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)
       serializedData.foreach{
-        pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize)
-              ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize)
-              ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+        keyed => 
+          val dataSize = keyed.message.payloadSize
+          ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
+          ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
@@ -57,7 +59,7 @@ class DefaultEventHandler[K,V](config: P
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.producerRetryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
-          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic).toSet))
+          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
           remainingRetries -= 1
           ProducerStats.resendRate.mark()
         }
@@ -70,24 +72,23 @@ class DefaultEventHandler[K,V](config: P
     }
   }
 
-  private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
+  private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
     val partitionedDataOpt = partitionAndCollate(messages)
     partitionedDataOpt match {
       case Some(partitionedData) =>
-        val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
+        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
         try {
-          for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
+          for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
             if (logger.isTraceEnabled)
-              eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
-                .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-            val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+              messagesPerBrokerMap.foreach(partitionAndEvent => 
+                trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+            val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
 
             val failedTopicPartitions = send(brokerid, messageSetPerBroker)
             failedTopicPartitions.foreach(topicPartition => {
-              eventsPerBrokerMap.get(topicPartition) match {
+              messagesPerBrokerMap.get(topicPartition) match {
                 case Some(data) => failedProduceRequests.appendAll(data)
                 case None => // nothing
-
               }
             })
           }
@@ -100,63 +101,61 @@ class DefaultEventHandler[K,V](config: P
     }
   }
 
-  def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
-    val serializedProducerData = new ListBuffer[ProducerData[K,Message]]
-    events.foreach {e =>
-      val serializedMessages = new ListBuffer[Message]
-      for (d <- e.getData) {
-        try {
-          serializedMessages += encoder.toMessage(d)
-        } catch {
-          case t =>
-            ProducerStats.serializationErrorRate.mark()
-            if (isSync)
-              throw t
-            else {
-              // currently, if in async mode, we just log the serialization error. We need to revisit
-              // this when doing kafka-496
-              error("Error serializing message ", t)
-            }
-        }
+  def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
+    val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
+    events.map{e =>
+      try {
+        if(e.hasKey)
+          serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
+        else
+          serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
+      } catch {
+        case t =>
+          ProducerStats.serializationErrorRate.mark()
+          if (isSync) {
+            throw t
+          } else {
+            // currently, if in async mode, we just log the serialization error. We need to revisit
+            // this when doing kafka-496
+            error("Error serializing message ", t)
+          }
       }
-      if (serializedMessages.size > 0)
-        serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages)
     }
-    serializedProducerData
+    serializedMessages
   }
 
-  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = {
-    val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]
+  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
+    val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
     try {
-      for (event <- events) {
-        val topicPartitionsList = getPartitionListForTopic(event)
+      for (message <- messages) {
+        val topicPartitionsList = getPartitionListForTopic(message)
         val totalNumPartitions = topicPartitionsList.length
 
-        val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+        val partitionIndex = getPartition(message.key, totalNumPartitions)
         val brokerPartition = topicPartitionsList(partitionIndex)
 
         // postpone the failure until the send operation, so that requests for other brokers are handled correctly
         val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
 
-        var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null
+        var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
         ret.get(leaderBrokerId) match {
           case Some(element) =>
-            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]]
+            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
           case None =>
-            dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]
+            dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
             ret.put(leaderBrokerId, dataPerBroker)
         }
 
-        val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId)
-        var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+        val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
+        var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
         dataPerBroker.get(topicAndPartition) match {
           case Some(element) =>
-            dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+            dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
           case None =>
-            dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+            dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
             dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
         }
-        dataPerTopicPartition.append(event)
+        dataPerTopicPartition.append(message)
       }
       Some(ret)
     }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
@@ -166,13 +165,14 @@ class DefaultEventHandler[K,V](config: P
     }
   }
 
-  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[PartitionAndLeader] = {
-    debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
-    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
+  private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
+    debug("Getting the number of broker partitions registered for topic: " + m.topic)
+    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic)
     debug("Broker partitions registered for topic: %s are %s"
-      .format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
+      .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
-    if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+    if(totalNumPartitions == 0) 
+      throw new NoBrokersForPartitionException("Partition key = " + m.key)
     topicPartitionsList
   }
 
@@ -236,7 +236,7 @@ class DefaultEventHandler[K,V](config: P
     }
   }
 
-  private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = {
+  private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
     /** enforce the compressed.topics config here.
      *  If the compression codec is anything other than NoCompressionCodec,
      *    Enable compression only for specified topics if any
@@ -244,32 +244,29 @@ class DefaultEventHandler[K,V](config: P
      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
      */
 
-    val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
-      val topicAndPartition = e._1
-      val produceData = e._2
-      val messages = new ListBuffer[Message]
-      produceData.foreach(p => messages.appendAll(p.getData))
+    val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
+      val rawMessages = messages.map(_.message)
       ( topicAndPartition,
         config.compressionCodec match {
           case NoCompressionCodec =>
             trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
-            new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+            new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
                 trace("Sending %d messages with compression codec %d to %s"
                   .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+                new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndPartition.topic)) {
                   trace("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                  new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+                  new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
                 }
                 else {
                   trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
                     .format(messages.size, topicAndPartition, config.compressedTopics.toString))
-                  new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+                  new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
                 }
             }
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala Thu Nov 15 22:15:14 2012
@@ -16,7 +16,7 @@
 */
 package kafka.producer.async
 
-import kafka.producer.ProducerData
+import kafka.producer.KeyedMessage
 
 /**
  * Handler that dispatches the batched data from the queue.
@@ -27,7 +27,7 @@ trait EventHandler[K,V] {
    * Callback to dispatch the batched data and send it to a Kafka server
    * @param events the data sent to the producer
   */
-  def handle(events: Seq[ProducerData[K,V]])
+  def handle(events: Seq[KeyedMessage[K,V]])
 
   /**
    * Cleans up and shuts down the event handler

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Thu Nov 15 22:15:14 2012
@@ -20,28 +20,25 @@ package kafka.producer.async
 import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
 import collection.mutable.ListBuffer
-import kafka.producer.ProducerData
+import kafka.producer.KeyedMessage
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 
 class ProducerSendThread[K,V](val threadName: String,
-                              val queue: BlockingQueue[ProducerData[K,V]],
+                              val queue: BlockingQueue[KeyedMessage[K,V]],
                               val handler: EventHandler[K,V],
                               val queueTime: Long,
                               val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
 
   private val shutdownLatch = new CountDownLatch(1)
-  private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
+  private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(
-    "ProducerQueueSize-" + getId,
-    new Gauge[Int] {
-      def getValue = queue.size
-    }
-  )
+  newGauge("ProducerQueueSize-" + getId,
+          new Gauge[Int] {
+            def getValue = queue.size
+          })
 
   override def run {
-
     try {
       processEvents
     }catch {
@@ -60,7 +57,7 @@ class ProducerSendThread[K,V](val thread
 
   private def processEvents() {
     var lastSend = SystemTime.milliseconds
-    var events = new ListBuffer[ProducerData[K,V]]
+    var events = new ListBuffer[KeyedMessage[K,V]]
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
@@ -72,12 +69,8 @@ class ProducerSendThread[K,V](val thread
         // returns a null object
         val expired = currentQueueItem == null
         if(currentQueueItem != null) {
-          if(currentQueueItem.getKey == null)
-            trace("Dequeued item for topic %s, no partition key, data: %s"
-              .format(currentQueueItem.getTopic, currentQueueItem.getData.toString))
-          else
-            trace("Dequeued item for topic %s, partition key: %s, data: %s"
-              .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
+          trace("Dequeued item for topic %s, partition key: %s, data: %s"
+              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
           events += currentQueueItem
         }
 
@@ -85,12 +78,14 @@ class ProducerSendThread[K,V](val thread
         full = events.size >= batchSize
 
         if(full || expired) {
-          if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
-          if(full) debug("Batch full. Sending..")
+          if(expired)
+            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
+          if(full)
+            debug("Batch full. Sending..")
           // if either queue time has reached or batch size has reached, dispatch to event handler
           tryToHandle(events)
           lastSend = SystemTime.milliseconds
-          events = new ListBuffer[ProducerData[K,V]]
+          events = new ListBuffer[KeyedMessage[K,V]]
         }
     }
     // send the last batch of events
@@ -100,7 +95,7 @@ class ProducerSendThread[K,V](val thread
         .format(queue.size))
   }
 
-  def tryToHandle(events: Seq[ProducerData[K,V]]) {
+  def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
     val size = events.size
     try {
       debug("Handling " + size + " events")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala Thu Nov 15 22:15:14 2012
@@ -17,21 +17,44 @@
 
 package kafka.serializer
 
-import kafka.message.Message
+import kafka.message._
+import kafka.utils.VerifiableProperties
 
+/**
+ * A decoder is a method of turning byte arrays into objects.
+ * An implementation is required to provide a constructor that
+ * takes a VerifiableProperties instance.
+ */
 trait Decoder[T] {
-  def toEvent(message: Message):T
+  def fromBytes(bytes: Array[Byte]): T
 }
 
-class DefaultDecoder extends Decoder[Message] {
-  def toEvent(message: Message):Message = message
+/**
+ * The default implementation does nothing, just returns the same byte array it takes in.
+ */
+class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] {
+  def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
 }
 
-class StringDecoder extends Decoder[String] {
-  def toEvent(message: Message):String = {
-    val buf = message.payload
-    val arr = new Array[Byte](buf.remaining)
-    buf.get(arr)
-    new String(arr)
+/**
+ * Decode messages without any key
+ */
+class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] {
+  def fromBytes(bytes: Array[Byte]) = new Message(bytes)
+}
+
+/**
+ * The string encoder translates strings into bytes. It uses UTF8 by default but takes
+ * an optional property serializer.encoding to control this.
+ */
+class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
+  val encoding = 
+    if(props == null)
+      "UTF8" 
+    else
+      props.getString("serializer.encoding", "UTF8")
+      
+  def fromBytes(bytes: Array[Byte]): String = {
+    new String(bytes, encoding)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala Thu Nov 15 22:15:14 2012
@@ -17,16 +17,44 @@
 
 package kafka.serializer
 
-import kafka.message.Message
+import kafka.utils.VerifiableProperties
+import kafka.message._
+import kafka.utils.Utils
 
+/**
+ * An encoder is a method of turning objects into byte arrays.
+ * An implementation is required to provide a constructor that
+ * takes a VerifiableProperties instance.
+ */
 trait Encoder[T] {
-  def toMessage(event: T):Message
+  def toBytes(t: T): Array[Byte]
 }
 
-class DefaultEncoder extends Encoder[Message] {
-  override def toMessage(event: Message):Message = event
+/**
+ * The default implementation is a no-op, it just returns the same array it takes in
+ */
+class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
+  override def toBytes(value: Array[Byte]): Array[Byte] = value
 }
 
-class StringEncoder extends Encoder[String] {
-  override def toMessage(event: String):Message = new Message(event.getBytes)
+class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
+  override def toBytes(value: T): Array[Byte] = null
+}
+
+/**
+ * The string encoder takes an optional parameter serializer.encoding which controls
+ * the character set used in encoding the string into bytes.
+ */
+class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
+  val encoding = 
+    if(props == null) 
+      "UTF8" 
+    else 
+      props.getString("serializer.encoding", "UTF8")
+  
+  override def toBytes(s: String): Array[Byte] = 
+    if(s == null)
+      null
+    else
+      s.getBytes(encoding)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java Thu Nov 15 22:15:14 2012
@@ -19,8 +19,8 @@ package kafka.tools;
 
 import joptsimple.*;
 import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
+import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.utils.Utils;
 import scala.collection.Iterator;
@@ -282,7 +282,7 @@ public class KafkaMigrationTool
           ((ByteBuffer)payload_07).get(bytes);
           Message message_08 = new Message(bytes);
           logger.debug(String.format("Send kafka 08 message of size %d to topic %s", message_08.size(), topic));
-          ProducerData<String, Message> producerData = new ProducerData((String)topic, message_08);
+          KeyedMessage<String, Message> producerData = new KeyedMessage((String)topic, null, message_08);
           Producer nextProducer = producerCircularIterator.next();
           nextProducer.send(producerData);
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala Thu Nov 15 22:15:14 2012
@@ -20,10 +20,11 @@ package kafka.tools
 import kafka.message.Message
 import joptsimple.OptionParser
 import kafka.utils.{Utils, CommandLineUtils, Logging}
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
 import kafka.consumer._
+import kafka.serializer._
 
 
 object MirrorMaker extends Logging {
@@ -92,7 +93,7 @@ object MirrorMaker extends Logging {
     val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
       val config = new ProducerConfig(
         Utils.loadProps(options.valueOf(producerConfigOpt)))
-      new Producer[Null, Message](config)
+      new Producer[Array[Byte], Array[Byte]](config)
     })
 
     val threads = {
@@ -113,11 +114,9 @@ object MirrorMaker extends Logging {
         new Blacklist(options.valueOf(blacklistOpt))
 
       val streams =
-        connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
+        connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
 
-      streams.flatten.zipWithIndex.map(streamAndIndex => {
-        new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
-      })
+      streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2))
     }
 
     threads.foreach(_.start())
@@ -125,8 +124,8 @@ object MirrorMaker extends Logging {
     threads.foreach(_.awaitShutdown())
   }
 
-  class MirrorMakerThread(stream: KafkaStream[Message],
-                          producers: Seq[Producer[Null, Message]],
+  class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
+                          producers: Seq[Producer[Array[Byte], Array[Byte]]],
                           threadId: Int)
           extends Thread with Logging {
 
@@ -140,16 +139,14 @@ object MirrorMaker extends Logging {
       try {
         for (msgAndMetadata <- stream) {
           val producer = producerSelector.next()
-          val pd = new ProducerData[Null, Message](
+          val pd = new KeyedMessage[Array[Byte], Array[Byte]](
             msgAndMetadata.topic, msgAndMetadata.message)
           producer.send(pd)
         }
-      }
-      catch {
+      } catch {
         case e =>
           fatal("%s stream unexpectedly exited.", e)
-      }
-      finally {
+      } finally {
         shutdownLatch.countDown()
         info("Stopped thread %s.".format(threadName))
       }
@@ -158,8 +155,7 @@ object MirrorMaker extends Logging {
     def awaitShutdown() {
       try {
         shutdownLatch.await()
-      }
-      catch {
+      } catch {
         case e: InterruptedException => fatal(
           "Shutdown of thread %s interrupted. This might leak data!"
                   .format(threadName))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala Thu Nov 15 22:15:14 2012
@@ -62,7 +62,7 @@ object ProducerShell {
         done = true
       } else {
         val message = line.trim
-        producer.send(new ProducerData[String, String](topic, message))
+        producer.send(new KeyedMessage[String, String](topic, message))
         println("Sent: %s (%d bytes)".format(line, message.getBytes.length))
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Thu Nov 15 22:15:14 2012
@@ -20,7 +20,7 @@ package kafka.tools
 import joptsimple.OptionParser
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import kafka.consumer._
 import kafka.utils.{Logging, ZkUtils}
 import kafka.api.OffsetRequest
@@ -136,7 +136,7 @@ object ReplayLogProducer extends Logging
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
+  class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()
     props.put("broker.list", config.brokerList)
@@ -150,7 +150,7 @@ object ReplayLogProducer extends Logging
       props.put("producer.type", "async")
 
     val producerConfig = new ProducerConfig(props)
-    val producer = new Producer[Message, Message](producerConfig)
+    val producer = new Producer[Array[Byte], Array[Byte]](producerConfig)
 
     override def run() {
       info("Starting consumer thread..")
@@ -163,7 +163,7 @@ object ReplayLogProducer extends Logging
             stream
         for (messageAndMetadata <- iter) {
           try {
-            producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message))
+            producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message))
             if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
               Thread.sleep(config.delayedMSBtwSend)
             messageCount += 1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Thu Nov 15 22:15:14 2012
@@ -19,10 +19,12 @@ package kafka.tools
 
 import joptsimple._
 import kafka.utils._
+import kafka.producer.ProducerConfig
 import kafka.consumer._
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
+import java.util.Properties
 import scala.collection.JavaConversions._
 
 /**
@@ -194,7 +196,9 @@ object SimpleConsumerShell extends Loggi
                 offset = messageAndOffset.nextOffset
                 if(printOffsets)
                   System.out.println("next offset = " + offset)
-                formatter.writeTo(messageAndOffset.message, System.out)
+                val message = messageAndOffset.message
+                val key = if(message.hasKey) Utils.readBytes(message.key) else null
+                formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
               } catch {
                 case e =>
                   if (skipMessageOnError)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Nov 15 22:15:14 2012
@@ -133,19 +133,25 @@ object Utils extends Logging {
     })
     thread
   }
+  
+  /**
+   * Read the given byte buffer into a byte array
+   */
+  def readBytes(buffer: ByteBuffer): Array[Byte] = readBytes(buffer, 0, buffer.limit)
 
   /**
    * Read a byte array from the given offset and size in the buffer
-   * TODO: Should use System.arraycopy
    */
   def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = {
-    val bytes = new Array[Byte](size)
-    var i = 0
-    while(i < size) {
-      bytes(i) = buffer.get(offset + i)
-      i += 1
+    val dest = new Array[Byte](size)
+    if(buffer.hasArray) {
+      System.arraycopy(buffer.array, buffer.arrayOffset() + offset, dest, 0, size)
+    } else {
+      buffer.mark()
+      buffer.get(dest)
+      buffer.reset()
     }
-    bytes
+    dest
   }
 
   /**
@@ -204,7 +210,7 @@ object Utils extends Logging {
    * @param buffer The buffer to translate
    * @param encoding The encoding to use in translating bytes to characters
    */
-  def readString(buffer: ByteBuffer, encoding: String): String = {
+  def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = {
     val bytes = new Array[Byte](buffer.remaining)
     buffer.get(bytes)
     new String(bytes, encoding)
@@ -446,16 +452,10 @@ object Utils extends Logging {
   /**
    * Create an instance of the class with the given class name
    */
-  def createObject[T<:AnyRef](className: String): T = {
-    className match {
-      case null => null.asInstanceOf[T]
-      case _ =>
-        val clazz = Class.forName(className)
-        val clazzT = clazz.asInstanceOf[Class[T]]
-        val constructors = clazzT.getConstructors
-        require(constructors.length == 1)
-        constructors.head.newInstance().asInstanceOf[T]
-    }
+  def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
+    val klass = Class.forName(className).asInstanceOf[Class[T]]
+    val constructor = klass.getConstructor(args.map(_.getClass): _*)
+    constructor.newInstance(args: _*).asInstanceOf[T]
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala Thu Nov 15 22:15:14 2012
@@ -55,7 +55,7 @@ object TestEndToEndLatency {
     var totalTime = 0.0
     for(i <- 0 until numMessages) {
       var begin = System.nanoTime
-      producer.send(new ProducerData(topic, message))
+      producer.send(new KeyedMessage(topic, message))
       val received = iter.next
       val elapsed = System.nanoTime - begin
       // poor man's progress bar