You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/11/15 23:15:26 UTC
svn commit: r1410055 [1/3] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/
core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/javaapi/consum...
Author: jkreps
Date: Thu Nov 15 22:15:14 2012
New Revision: 1410055
URL: http://svn.apache.org/viewvc?rev=1410055&view=rev
Log:
KAFKA-544 Store the key given to the producer in the message. Expose this key in the consumer. Patch reviewed by Jun.
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
Modified:
incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java
incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Thu Nov 15 22:15:14 2012
@@ -27,9 +27,9 @@ import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
+import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -81,18 +81,17 @@ public class DataGenerator {
public void run() throws Exception {
- List<Message> list = new ArrayList<Message>();
+ List<KeyedMessage> list = new ArrayList<KeyedMessage>();
for (int i = 0; i < _count; i++) {
Long timestamp = RANDOM.nextLong();
if (timestamp < 0) timestamp = -timestamp;
byte[] bytes = timestamp.toString().getBytes("UTF8");
Message message = new Message(bytes);
- list.add(message);
+ list.add(new KeyedMessage<Integer, Message>(_topic, null, message));
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
- ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
- _producer.send(pd);
+ _producer.send(list);
// close the producer
_producer.close();
Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Thu Nov 15 22:15:14 2012
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
+import kafka.producer.KeyedMessage;
import kafka.message.Message;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@@ -33,7 +33,7 @@ public class KafkaRecordWriter<W extends
protected Producer<Integer, Message> producer;
protected String topic;
- protected List<ProducerData<Integer, Message>> msgList = new LinkedList<ProducerData<Integer, Message>>();
+ protected List<KeyedMessage<Integer, Message>> msgList = new LinkedList<KeyedMessage<Integer, Message>>();
protected int totalSize = 0;
protected int queueSize;
@@ -57,7 +57,7 @@ public class KafkaRecordWriter<W extends
public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
{
Message msg = new Message(value.getBytes());
- msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
+ msgList.add(new KeyedMessage<Integer, Message>(this.topic, msg));
totalSize += msg.size();
// MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Nov 15 22:15:14 2012
@@ -24,7 +24,7 @@ import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
-import kafka.serializer.StringDecoder
+import kafka.serializer._
import kafka.utils._
import kafka.metrics.KafkaMetricsReporter
@@ -179,7 +179,7 @@ object ConsoleConsumer extends Logging {
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
try {
- val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+ val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
val iter = if(maxMessages >= 0)
stream.slice(0, maxMessages)
else
@@ -187,7 +187,7 @@ object ConsoleConsumer extends Logging {
for(messageAndTopic <- iter) {
try {
- formatter.writeTo(messageAndTopic.message, System.out)
+ formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
} catch {
case e =>
if (skipMessageOnError)
@@ -251,36 +251,14 @@ object MessageFormatter {
}
trait MessageFormatter {
- def writeTo(message: Message, output: PrintStream)
+ def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties) {}
def close() {}
}
-class DecodedMessageFormatter extends MessageFormatter {
- var topicStr: String = _
- val decoder = new StringDecoder()
-
- override def init(props: Properties) {
- topicStr = props.getProperty("topic")
- if (topicStr != null)
- topicStr = topicStr + ":"
- else
- topicStr = ""
- }
-
- def writeTo(message: Message, output: PrintStream) {
- try {
- output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
- } catch {
- case e => e.printStackTrace()
- }
- }
-}
-
class NewlineMessageFormatter extends MessageFormatter {
- def writeTo(message: Message, output: PrintStream) {
- val payload = message.payload
- output.write(payload.array, payload.arrayOffset, payload.limit)
+ def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+ output.write(value)
output.write('\n')
}
}
@@ -296,8 +274,8 @@ class ChecksumMessageFormatter extends M
topicStr = ""
}
- def writeTo(message: Message, output: PrintStream) {
- val chksum = message.checksum
+ def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+ val chksum = new Message(value, key).checksum
output.println(topicStr + "checksum:" + chksum)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Thu Nov 15 22:15:14 2012
@@ -19,38 +19,53 @@ package kafka.consumer
import scala.collection._
import kafka.utils.Logging
-import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.serializer._
/**
* Main interface for consumer
*/
trait ConsumerConnector {
+
/**
* Create a list of MessageStreams for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
- * @param decoder Decoder to decode each Message to type T
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
- def createMessageStreams[T](topicCountMap: Map[String,Int],
- decoder: Decoder[T] = new DefaultDecoder)
- : Map[String,List[KafkaStream[T]]]
-
+ def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
+
+ /**
+ * Create a list of MessageStreams for each topic.
+ *
+ * @param topicCountMap a map of (topic, #streams) pair
+ * @param keyDecoder Decoder to decode the key portion of the message
+ * @param valueDecoder Decoder to decode the value portion of the message
+ * @return a map of (topic, list of KafkaStream) pairs.
+ * The number of items in the list is #streams. Each stream supports
+ * an iterator over message/metadata pairs.
+ */
+ def createMessageStreams[K,V](topicCountMap: Map[String,Int],
+ keyDecoder: Decoder[K],
+ valueDecoder: Decoder[V])
+ : Map[String,List[KafkaStream[K,V]]]
+
/**
* Create a list of message streams for all topics that match a given filter.
*
* @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
* @param numStreams Number of streams to return
- * @param decoder Decoder to decode each Message to type T
+ * @param keyDecoder Decoder to decode the key portion of the message
+ * @param valueDecoder Decoder to decode the value portion of the message
* @return a list of KafkaStream each of which provides an
* iterator over message/metadata pairs over allowed topics.
*/
- def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
- numStreams: Int = 1,
- decoder: Decoder[T] = new DefaultDecoder)
- : Seq[KafkaStream[T]]
+ def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
+ numStreams: Int = 1,
+ keyDecoder: Decoder[K] = new DefaultDecoder(),
+ valueDecoder: Decoder[V] = new DefaultDecoder())
+ : Seq[KafkaStream[K,V]]
/**
* Commit the offsets of all broker partitions connected by this connector.
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Nov 15 22:15:14 2012
@@ -17,7 +17,7 @@
package kafka.consumer
-import kafka.utils.{IteratorTemplate, Logging}
+import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
@@ -30,17 +30,18 @@ import kafka.common.{KafkaException, Mes
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
-class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
- consumerTimeoutMs: Int,
- private val decoder: Decoder[T],
- val enableShallowIterator: Boolean)
- extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
+class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
+ consumerTimeoutMs: Int,
+ private val keyDecoder: Decoder[K],
+ private val valueDecoder: Decoder[V],
+ val enableShallowIterator: Boolean)
+ extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
- private var currentTopicInfo:PartitionTopicInfo = null
+ private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
- override def next(): MessageAndMetadata[T] = {
+ override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
@@ -52,7 +53,7 @@ class ConsumerIterator[T](private val ch
item
}
- protected def makeNext(): MessageAndMetadata[T] = {
+ protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
@@ -103,7 +104,10 @@ class ConsumerIterator[T](private val ch
item.message.ensureValid() // validate checksum of message to ensure it is valid
- new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
+ val keyBuffer = item.message.key
+ val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
+ val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
+ new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
}
def clearCurrentChunk() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala Thu Nov 15 22:15:14 2012
@@ -22,19 +22,20 @@ import java.util.concurrent.BlockingQueu
import kafka.serializer.Decoder
import kafka.message.MessageAndMetadata
-class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
- consumerTimeoutMs: Int,
- private val decoder: Decoder[T],
- val enableShallowIterator: Boolean)
- extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
+class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
+ consumerTimeoutMs: Int,
+ private val keyDecoder: Decoder[K],
+ private val valueDecoder: Decoder[V],
+ val enableShallowIterator: Boolean)
+ extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
- private val iter: ConsumerIterator[T] =
- new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
+ private val iter: ConsumerIterator[K,V] =
+ new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
/**
* Create an iterator over messages in the stream.
*/
- def iterator(): ConsumerIterator[T] = iter
+ def iterator(): ConsumerIterator[K,V] = iter
/**
* This method clears the queue being iterated during the consumer rebalancing. This is mainly
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Nov 15 22:15:14 2012
@@ -28,13 +28,14 @@ import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import java.util.UUID
-import kafka.serializer.Decoder
+import kafka.serializer._
import kafka.utils.ZkUtils._
import kafka.common._
import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge
import kafka.api.OffsetRequest
import kafka.metrics._
+import kafka.producer.ProducerConfig
/**
@@ -120,17 +121,23 @@ private[kafka] class ZookeeperConsumerCo
KafkaMetricsReporter.startReporters(config.props)
def this(config: ConsumerConfig) = this(config, true)
+
+ def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] =
+ createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
- def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
- : Map[String,List[KafkaStream[T]]] = {
+ def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
+ : Map[String, List[KafkaStream[K,V]]] = {
if (messageStreamCreated.getAndSet(true))
throw new RuntimeException(this.getClass.getSimpleName +
" can create message streams at most once")
- consume(topicCountMap, decoder)
+ consume(topicCountMap, keyDecoder, valueDecoder)
}
- def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
- val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
+ def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
+ numStreams: Int,
+ keyDecoder: Decoder[K] = new DefaultDecoder(),
+ valueDecoder: Decoder[V] = new DefaultDecoder()) = {
+ val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder)
wildcardStreamsHandler.streams
}
@@ -173,8 +180,8 @@ private[kafka] class ZookeeperConsumerCo
}
}
- def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
- : Map[String,List[KafkaStream[T]]] = {
+ def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
+ : Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
@@ -187,8 +194,8 @@ private[kafka] class ZookeeperConsumerCo
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
- val stream = new KafkaStream[T](
- queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+ val stream = new KafkaStream[K,V](
+ queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
(queue, stream)
})
).flatten.toList
@@ -197,7 +204,7 @@ private[kafka] class ZookeeperConsumerCo
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
- loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
+ loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
// this API is used by unit tests only
@@ -293,7 +300,7 @@ private[kafka] class ZookeeperConsumerCo
}
class ZKRebalancerListener(val group: String, val consumerIdString: String,
- val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
+ val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
private var isWatcherTriggered = false
private val lock = new ReentrantLock
@@ -473,7 +480,7 @@ private[kafka] class ZookeeperConsumerCo
}
private def closeFetchersForQueues(cluster: Cluster,
- messageStreams: Map[String,List[KafkaStream[_]]],
+ messageStreams: Map[String,List[KafkaStream[_,_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
@@ -496,7 +503,7 @@ private[kafka] class ZookeeperConsumerCo
private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
- messageStreams: Map[String,List[KafkaStream[_]]]) {
+ messageStreams: Map[String,List[KafkaStream[_,_]]]) {
// Clear all but the currently iterated upon chunk in the consumer thread's queue
queuesTobeCleared.foreach(_.clear)
@@ -510,7 +517,7 @@ private[kafka] class ZookeeperConsumerCo
}
- private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
+ private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]],
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
@@ -610,17 +617,17 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def reinitializeConsumer[T](
+ private def reinitializeConsumer[K,V](
topicCount: TopicCount,
- queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+ queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
val dirs = new ZKGroupDirs(config.groupId)
// listener to consumer and partition changes
if (loadBalancerListener == null) {
- val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
+ val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
loadBalancerListener = new ZKRebalancerListener(
- config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
+ config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
}
// register listener for session expired event
@@ -690,9 +697,10 @@ private[kafka] class ZookeeperConsumerCo
loadBalancerListener.syncedRebalance()
}
- class WildcardStreamsHandler[T](topicFilter: TopicFilter,
+ class WildcardStreamsHandler[K,V](topicFilter: TopicFilter,
numStreams: Int,
- decoder: Decoder[T])
+ keyDecoder: Decoder[K],
+ valueDecoder: Decoder[V])
extends TopicEventHandler[String] {
if (messageStreamCreated.getAndSet(true))
@@ -702,8 +710,11 @@ private[kafka] class ZookeeperConsumerCo
private val wildcardQueuesAndStreams = (1 to numStreams)
.map(e => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
- val stream = new KafkaStream[T](
- queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+ val stream = new KafkaStream[K,V](queue,
+ config.consumerTimeoutMs,
+ keyDecoder,
+ valueDecoder,
+ config.enableShallowIterator)
(queue, stream)
}).toList
@@ -760,7 +771,7 @@ private[kafka] class ZookeeperConsumerCo
reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
}
- def streams: Seq[KafkaStream[T]] =
+ def streams: Seq[KafkaStream[K,V]] =
wildcardQueuesAndStreams.map(_._2)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Thu Nov 15 22:15:14 2012
@@ -20,7 +20,6 @@ package kafka.javaapi.consumer;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
-import kafka.message.Message;
import kafka.serializer.Decoder;
import java.util.List;
@@ -36,10 +35,10 @@ public interface ConsumerConnector {
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
- public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
- Map<String, Integer> topicCountMap, Decoder<T> decoder);
- public Map<String, List<KafkaStream<Message>>> createMessageStreams(
- Map<String, Integer> topicCountMap);
+ public <K,V> Map<String, List<KafkaStream<K,V>>>
+ createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
+
+ public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of MessageAndTopicStreams containing messages of type T.
@@ -47,16 +46,17 @@ public interface ConsumerConnector {
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* @param numStreams the number of message streams to return.
- * @param decoder a decoder that converts from Message to T
+ * @param keyDecoder a decoder that decodes the message key
+ * @param valueDecoder a decoder that decodes the message itself
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
*/
- public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
- TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
- public List<KafkaStream<Message>> createMessageStreamsByFilter(
- TopicFilter topicFilter, int numStreams);
- public List<KafkaStream<Message>> createMessageStreamsByFilter(
- TopicFilter topicFilter);
+ public <K,V> List<KafkaStream<K,V>>
+ createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
+
+ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
+
+ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all broker partitions connected by this connector.
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Thu Nov 15 22:15:14 2012
@@ -17,7 +17,7 @@
package kafka.javaapi.consumer
import kafka.message.Message
-import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.serializer._
import kafka.consumer._
import scala.collection.JavaConversions.asList
@@ -59,7 +59,7 @@ import scala.collection.JavaConversions.
*/
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
- val enableFetcher: Boolean) // for testing only
+ val enableFetcher: Boolean) // for testing only
extends ConsumerConnector {
private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
@@ -67,38 +67,37 @@ private[kafka] class ZookeeperConsumerCo
def this(config: ConsumerConfig) = this(config, true)
// for java client
- def createMessageStreams[T](
+ def createMessageStreams[K,V](
topicCountMap: java.util.Map[String,java.lang.Integer],
- decoder: Decoder[T])
- : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
+ keyDecoder: Decoder[K],
+ valueDecoder: Decoder[V])
+ : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
import scala.collection.JavaConversions._
val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
- val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
- val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
+ val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
+ val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) {
- var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
+ var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)
}
ret
}
-
- def createMessageStreams(
- topicCountMap: java.util.Map[String,java.lang.Integer])
- : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
- createMessageStreams(topicCountMap, new DefaultDecoder)
-
- def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
- asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
-
- def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
- createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
-
- def createMessageStreamsByFilter(topicFilter: TopicFilter) =
- createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
-
+
+ def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
+ createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
+
+ def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
+ asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
+
+ def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
+ createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
+
+ def createMessageStreamsByFilter(topicFilter: TopicFilter) =
+ createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
+
def commitOffsets() {
underlying.commitOffsets
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala Thu Nov 15 22:15:14 2012
@@ -18,6 +18,7 @@
package kafka.javaapi.producer
import kafka.producer.ProducerConfig
+import kafka.producer.KeyedMessage
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
@@ -27,20 +28,17 @@ class Producer[K,V](private val underlyi
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
*/
- def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
- import collection.JavaConversions._
- underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
- asBuffer(producerData.getData)))
+ def send(message: KeyedMessage[K,V]) {
+ underlying.send(message)
}
/**
* Use this API to send data to multiple topics
* @param producerData list of producer data objects that encapsulate the topic, key and message data
*/
- def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
+ def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
- underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
- asBuffer(pd.getData))): _*)
+ underlying.send(asBuffer(messages):_*)
}
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala Thu Nov 15 22:15:14 2012
@@ -17,5 +17,5 @@
package kafka.message
-case class MessageAndMetadata[T](message: T, topic: String = "")
+case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Thu Nov 15 22:15:14 2012
@@ -59,6 +59,14 @@ class BlockingChannel( val host: String,
writeChannel = channel
readChannel = Channels.newChannel(channel.socket().getInputStream)
connected = true
+ // settings may not match what we requested above
+ val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
+ debug(msg.format(channel.socket.getSoTimeout,
+ readTimeoutMs,
+ channel.socket.getReceiveBufferSize,
+ readBufferSize,
+ channel.socket.getSendBufferSize,
+ writeBufferSize))
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Thu Nov 15 22:15:14 2012
@@ -111,7 +111,7 @@ object ConsoleProducer {
do {
message = reader.readMessage()
if(message != null)
- producer.send(new ProducerData(topic, message))
+ producer.send(new KeyedMessage(topic, message))
} while(message != null)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala Thu Nov 15 22:15:14 2012
@@ -19,7 +19,9 @@ package kafka.producer
import kafka.utils.Utils
-private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
+import kafka.utils._
+
+private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] {
private val random = new java.util.Random
def partition(key: T, numPartitions: Int): Int = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Nov 15 22:15:14 2012
@@ -87,8 +87,7 @@ class KafkaLog4jAppender extends Appende
}
else this.layout.format(event)
LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
- val messageData : ProducerData[String, String] =
- new ProducerData[String, String](topic, message)
+ val messageData = new KeyedMessage[String, String](topic, message)
producer.send(messageData);
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala?rev=1410055&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KeyedMessage.scala Thu Nov 15 22:15:14 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+/**
+ * A topic, key, and value
+ */
+case class KeyedMessage[K, V](val topic: String, val key: K, val message: V) {
+ if(topic == null)
+ throw new IllegalArgumentException("Topic cannot be null.")
+
+ def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], message)
+
+ def hasKey = key != null
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Partitioner.scala Thu Nov 15 22:15:14 2012
@@ -16,6 +16,13 @@
*/
package kafka.producer
+/**
+ * A partitioner controls the mapping between user-provided keys and kafka partitions. Users can implement a custom
+ * partitioner to change this mapping.
+ *
+ * Implementations will be constructed via reflection and are required to have a constructor that takes a single
+ * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
+ */
trait Partitioner[T] {
/**
* Uses the key to calculate a partition bucket id for routing
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Thu Nov 15 22:15:14 2012
@@ -33,7 +33,7 @@ extends Logging {
if (config.batchSize > config.queueSize)
throw new InvalidConfigException("Batch size can't be larger than queue size.")
- private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
+ private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
private val random = new Random
private var sync: Boolean = true
@@ -43,9 +43,12 @@ extends Logging {
case "async" =>
sync = false
val asyncProducerID = random.nextInt(Int.MaxValue)
- producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
- eventHandler, config.queueTime, config.batchSize)
- producerSendThread.start
+ producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID,
+ queue,
+ eventHandler,
+ config.queueTime,
+ config.batchSize)
+ producerSendThread.start()
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
@@ -54,8 +57,9 @@ extends Logging {
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
- Utils.createObject[Partitioner[K]](config.partitionerClass),
- Utils.createObject[Encoder[V]](config.serializerClass),
+ Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+ Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+ Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config)))
/**
@@ -63,36 +67,36 @@ extends Logging {
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
*/
- def send(producerData: ProducerData[K,V]*) {
+ def send(messages: KeyedMessage[K,V]*) {
if (hasShutdown.get)
throw new ProducerClosedException
- recordStats(producerData: _*)
+ recordStats(messages)
sync match {
- case true => eventHandler.handle(producerData)
- case false => asyncSend(producerData: _*)
+ case true => eventHandler.handle(messages)
+ case false => asyncSend(messages)
}
}
- private def recordStats(producerData: ProducerData[K,V]*) {
- for (data <- producerData) {
- ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size)
- ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size)
+ private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
+ for (message <- messages) {
+ ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
+ ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
}
}
- private def asyncSend(producerData: ProducerData[K,V]*) {
- for (data <- producerData) {
+ private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
+ for (message <- messages) {
val added = config.enqueueTimeoutMs match {
case 0 =>
- queue.offer(data)
+ queue.offer(message)
case _ =>
try {
config.enqueueTimeoutMs < 0 match {
case true =>
- queue.put(data)
+ queue.put(message)
true
case _ =>
- queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+ queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
@@ -102,10 +106,10 @@ extends Logging {
}
if(!added) {
AsyncProducerStats.droppedMessageRate.mark()
- error("Event queue is full of unsent messages, could not send event: " + data.toString)
- throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
+ error("Event queue is full of unsent messages, could not send event: " + message.toString)
+ throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
- trace("Added to send queue an event: " + data.toString)
+ trace("Added to send queue an event: " + message.toString)
trace("Remaining queue size: " + queue.remainingCapacity)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala Thu Nov 15 22:15:14 2012
@@ -38,6 +38,10 @@ trait AsyncProducerConfig {
/** the number of messages batched at the producer */
val batchSize = props.getInt("batch.size", 200)
- /** the serializer class for events */
+ /** the serializer class for values */
val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
+
+ /** the serializer class for keys (defaults to the same as for values) */
+ val keySerializerClass = props.getString("key.serializer.class", serializerClass)
+
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Thu Nov 15 22:15:14 2012
@@ -23,7 +23,7 @@ import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.{Seq, Map}
-import scala.collection.mutable.{ListBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
@@ -31,6 +31,7 @@ import kafka.api.{TopicMetadata, Produce
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner[K],
private val encoder: Encoder[V],
+ private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging {
@@ -41,13 +42,14 @@ class DefaultEventHandler[K,V](config: P
private val lock = new Object()
- def handle(events: Seq[ProducerData[K,V]]) {
+ def handle(events: Seq[KeyedMessage[K,V]]) {
lock synchronized {
val serializedData = serialize(events)
serializedData.foreach{
- pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize)
- ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize)
- ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+ keyed =>
+ val dataSize = keyed.message.payloadSize
+ ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
+ ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
@@ -57,7 +59,7 @@ class DefaultEventHandler[K,V](config: P
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
- Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic).toSet))
+ Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
remainingRetries -= 1
ProducerStats.resendRate.mark()
}
@@ -70,24 +72,23 @@ class DefaultEventHandler[K,V](config: P
}
}
- private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
+ private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
- val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
+ val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
try {
- for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
+ for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
- eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
- .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+ messagesPerBrokerMap.foreach(partitionAndEvent =>
+ trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+ val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedTopicPartitions.foreach(topicPartition => {
- eventsPerBrokerMap.get(topicPartition) match {
+ messagesPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
-
}
})
}
@@ -100,63 +101,61 @@ class DefaultEventHandler[K,V](config: P
}
}
- def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
- val serializedProducerData = new ListBuffer[ProducerData[K,Message]]
- events.foreach {e =>
- val serializedMessages = new ListBuffer[Message]
- for (d <- e.getData) {
- try {
- serializedMessages += encoder.toMessage(d)
- } catch {
- case t =>
- ProducerStats.serializationErrorRate.mark()
- if (isSync)
- throw t
- else {
- // currently, if in async mode, we just log the serialization error. We need to revisit
- // this when doing kafka-496
- error("Error serializing message ", t)
- }
- }
+ def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
+ val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
+ events.map{e =>
+ try {
+ if(e.hasKey)
+ serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
+ else
+ serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
+ } catch {
+ case t =>
+ ProducerStats.serializationErrorRate.mark()
+ if (isSync) {
+ throw t
+ } else {
+ // currently, if in async mode, we just log the serialization error. We need to revisit
+ // this when doing kafka-496
+ error("Error serializing message ", t)
+ }
}
- if (serializedMessages.size > 0)
- serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages)
}
- serializedProducerData
+ serializedMessages
}
- def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = {
- val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]
+ def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
+ val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
try {
- for (event <- events) {
- val topicPartitionsList = getPartitionListForTopic(event)
+ for (message <- messages) {
+ val topicPartitionsList = getPartitionListForTopic(message)
val totalNumPartitions = topicPartitionsList.length
- val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+ val partitionIndex = getPartition(message.key, totalNumPartitions)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
- var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null
+ var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]]
+ dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
case None =>
- dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]
+ dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
- val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId)
- var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+ val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
+ var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
- dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+ dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
case None =>
- dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+ dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
}
- dataPerTopicPartition.append(event)
+ dataPerTopicPartition.append(message)
}
Some(ret)
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
@@ -166,13 +165,14 @@ class DefaultEventHandler[K,V](config: P
}
}
- private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[PartitionAndLeader] = {
- debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
- val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
+ private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
+ debug("Getting the number of broker partitions registered for topic: " + m.topic)
+ val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic)
debug("Broker partitions registered for topic: %s are %s"
- .format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
+ .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
- if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
+ if(totalNumPartitions == 0)
+ throw new NoBrokersForPartitionException("Partition key = " + m.key)
topicPartitionsList
}
@@ -236,7 +236,7 @@ class DefaultEventHandler[K,V](config: P
}
}
- private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = {
+ private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
@@ -244,32 +244,29 @@ class DefaultEventHandler[K,V](config: P
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
- val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
- val topicAndPartition = e._1
- val produceData = e._2
- val messages = new ListBuffer[Message]
- produceData.foreach(p => messages.appendAll(p.getData))
+ val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
+ val rawMessages = messages.map(_.message)
( topicAndPartition,
config.compressionCodec match {
case NoCompressionCodec =>
trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
- new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+ new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
trace("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
- new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+ new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
case _ =>
if(config.compressedTopics.contains(topicAndPartition.topic)) {
trace("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
- new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+ new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
}
else {
trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
.format(messages.size, topicAndPartition, config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+ new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/EventHandler.scala Thu Nov 15 22:15:14 2012
@@ -16,7 +16,7 @@
*/
package kafka.producer.async
-import kafka.producer.ProducerData
+import kafka.producer.KeyedMessage
/**
* Handler that dispatches the batched data from the queue.
@@ -27,7 +27,7 @@ trait EventHandler[K,V] {
* Callback to dispatch the batched data and send it to a Kafka server
* @param events the data sent to the producer
*/
- def handle(events: Seq[ProducerData[K,V]])
+ def handle(events: Seq[KeyedMessage[K,V]])
/**
* Cleans up and shuts down the event handler
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Thu Nov 15 22:15:14 2012
@@ -20,28 +20,25 @@ package kafka.producer.async
import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ListBuffer
-import kafka.producer.ProducerData
+import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
class ProducerSendThread[K,V](val threadName: String,
- val queue: BlockingQueue[ProducerData[K,V]],
+ val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V],
val queueTime: Long,
val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1)
- private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
+ private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
- newGauge(
- "ProducerQueueSize-" + getId,
- new Gauge[Int] {
- def getValue = queue.size
- }
- )
+ newGauge("ProducerQueueSize-" + getId,
+ new Gauge[Int] {
+ def getValue = queue.size
+ })
override def run {
-
try {
processEvents
}catch {
@@ -60,7 +57,7 @@ class ProducerSendThread[K,V](val thread
private def processEvents() {
var lastSend = SystemTime.milliseconds
- var events = new ListBuffer[ProducerData[K,V]]
+ var events = new ListBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
@@ -72,12 +69,8 @@ class ProducerSendThread[K,V](val thread
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
- if(currentQueueItem.getKey == null)
- trace("Dequeued item for topic %s, no partition key, data: %s"
- .format(currentQueueItem.getTopic, currentQueueItem.getData.toString))
- else
- trace("Dequeued item for topic %s, partition key: %s, data: %s"
- .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
+ trace("Dequeued item for topic %s, partition key: %s, data: %s"
+ .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
@@ -85,12 +78,14 @@ class ProducerSendThread[K,V](val thread
full = events.size >= batchSize
if(full || expired) {
- if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
- if(full) debug("Batch full. Sending..")
+ if(expired)
+ debug(elapsed + " ms elapsed. Queue time reached. Sending..")
+ if(full)
+ debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
- events = new ListBuffer[ProducerData[K,V]]
+ events = new ListBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
@@ -100,7 +95,7 @@ class ProducerSendThread[K,V](val thread
.format(queue.size))
}
- def tryToHandle(events: Seq[ProducerData[K,V]]) {
+ def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
val size = events.size
try {
debug("Handling " + size + " events")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala Thu Nov 15 22:15:14 2012
@@ -17,21 +17,44 @@
package kafka.serializer
-import kafka.message.Message
+import kafka.message._
+import kafka.utils.VerifiableProperties
+/**
+ * A decoder is a method of turning byte arrays into objects.
+ * An implementation is required to provide a constructor that
+ * takes a VerifiableProperties instance.
+ */
trait Decoder[T] {
- def toEvent(message: Message):T
+ def fromBytes(bytes: Array[Byte]): T
}
-class DefaultDecoder extends Decoder[Message] {
- def toEvent(message: Message):Message = message
+/**
+ * The default implementation does nothing, just returns the same byte array it takes in.
+ */
+class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] {
+ def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
-class StringDecoder extends Decoder[String] {
- def toEvent(message: Message):String = {
- val buf = message.payload
- val arr = new Array[Byte](buf.remaining)
- buf.get(arr)
- new String(arr)
+/**
+ * Decode messages without any key
+ */
+class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] {
+ def fromBytes(bytes: Array[Byte]) = new Message(bytes)
+}
+
+/**
+ * The string encoder translates strings into bytes. It uses UTF8 by default but takes
+ * an optional property serializer.encoding to control this.
+ */
+class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
+ val encoding =
+ if(props == null)
+ "UTF8"
+ else
+ props.getString("serializer.encoding", "UTF8")
+
+ def fromBytes(bytes: Array[Byte]): String = {
+ new String(bytes, encoding)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Encoder.scala Thu Nov 15 22:15:14 2012
@@ -17,16 +17,44 @@
package kafka.serializer
-import kafka.message.Message
+import kafka.utils.VerifiableProperties
+import kafka.message._
+import kafka.utils.Utils
+/**
+ * An encoder is a method of turning objects into byte arrays.
+ * An implementation is required to provide a constructor that
+ * takes a VerifiableProperties instance.
+ */
trait Encoder[T] {
- def toMessage(event: T):Message
+ def toBytes(t: T): Array[Byte]
}
-class DefaultEncoder extends Encoder[Message] {
- override def toMessage(event: Message):Message = event
+/**
+ * The default implementation is a no-op, it just returns the same array it takes in
+ */
+class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
+ override def toBytes(value: Array[Byte]): Array[Byte] = value
}
-class StringEncoder extends Encoder[String] {
- override def toMessage(event: String):Message = new Message(event.getBytes)
+class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
+ override def toBytes(value: T): Array[Byte] = null
+}
+
+/**
+ * The string encoder takes an optional parameter serializer.encoding which controls
+ * the character set used in encoding the string into bytes.
+ */
+class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
+ val encoding =
+ if(props == null)
+ "UTF8"
+ else
+ props.getString("serializer.encoding", "UTF8")
+
+ override def toBytes(s: String): Array[Byte] =
+ if(s == null)
+ null
+ else
+ s.getBytes(encoding)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/KafkaMigrationTool.java Thu Nov 15 22:15:14 2012
@@ -19,8 +19,8 @@ package kafka.tools;
import joptsimple.*;
import kafka.javaapi.producer.Producer;
-import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
+import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.Utils;
import scala.collection.Iterator;
@@ -282,7 +282,7 @@ public class KafkaMigrationTool
((ByteBuffer)payload_07).get(bytes);
Message message_08 = new Message(bytes);
logger.debug(String.format("Send kafka 08 message of size %d to topic %s", message_08.size(), topic));
- ProducerData<String, Message> producerData = new ProducerData((String)topic, message_08);
+ KeyedMessage<String, Message> producerData = new KeyedMessage((String)topic, null, message_08);
Producer nextProducer = producerCircularIterator.next();
nextProducer.send(producerData);
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala Thu Nov 15 22:15:14 2012
@@ -20,10 +20,11 @@ package kafka.tools
import kafka.message.Message
import joptsimple.OptionParser
import kafka.utils.{Utils, CommandLineUtils, Logging}
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import scala.collection.JavaConversions._
import java.util.concurrent.CountDownLatch
import kafka.consumer._
+import kafka.serializer._
object MirrorMaker extends Logging {
@@ -92,7 +93,7 @@ object MirrorMaker extends Logging {
val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
val config = new ProducerConfig(
Utils.loadProps(options.valueOf(producerConfigOpt)))
- new Producer[Null, Message](config)
+ new Producer[Array[Byte], Array[Byte]](config)
})
val threads = {
@@ -113,11 +114,9 @@ object MirrorMaker extends Logging {
new Blacklist(options.valueOf(blacklistOpt))
val streams =
- connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
+ connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
- streams.flatten.zipWithIndex.map(streamAndIndex => {
- new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
- })
+ streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2))
}
threads.foreach(_.start())
@@ -125,8 +124,8 @@ object MirrorMaker extends Logging {
threads.foreach(_.awaitShutdown())
}
- class MirrorMakerThread(stream: KafkaStream[Message],
- producers: Seq[Producer[Null, Message]],
+ class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
+ producers: Seq[Producer[Array[Byte], Array[Byte]]],
threadId: Int)
extends Thread with Logging {
@@ -140,16 +139,14 @@ object MirrorMaker extends Logging {
try {
for (msgAndMetadata <- stream) {
val producer = producerSelector.next()
- val pd = new ProducerData[Null, Message](
+ val pd = new KeyedMessage[Array[Byte], Array[Byte]](
msgAndMetadata.topic, msgAndMetadata.message)
producer.send(pd)
}
- }
- catch {
+ } catch {
case e =>
fatal("%s stream unexpectedly exited.", e)
- }
- finally {
+ } finally {
shutdownLatch.countDown()
info("Stopped thread %s.".format(threadName))
}
@@ -158,8 +155,7 @@ object MirrorMaker extends Logging {
def awaitShutdown() {
try {
shutdownLatch.await()
- }
- catch {
+ } catch {
case e: InterruptedException => fatal(
"Shutdown of thread %s interrupted. This might leak data!"
.format(threadName))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala Thu Nov 15 22:15:14 2012
@@ -62,7 +62,7 @@ object ProducerShell {
done = true
} else {
val message = line.trim
- producer.send(new ProducerData[String, String](topic, message))
+ producer.send(new KeyedMessage[String, String](topic, message))
println("Sent: %s (%d bytes)".format(line, message.getBytes.length))
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Thu Nov 15 22:15:14 2012
@@ -20,7 +20,7 @@ package kafka.tools
import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{Logging, ZkUtils}
import kafka.api.OffsetRequest
@@ -136,7 +136,7 @@ object ReplayLogProducer extends Logging
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
}
- class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
+ class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val props = new Properties()
props.put("broker.list", config.brokerList)
@@ -150,7 +150,7 @@ object ReplayLogProducer extends Logging
props.put("producer.type", "async")
val producerConfig = new ProducerConfig(props)
- val producer = new Producer[Message, Message](producerConfig)
+ val producer = new Producer[Array[Byte], Array[Byte]](producerConfig)
override def run() {
info("Starting consumer thread..")
@@ -163,7 +163,7 @@ object ReplayLogProducer extends Logging
stream
for (messageAndMetadata <- iter) {
try {
- producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message))
+ producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message))
if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
messageCount += 1
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Thu Nov 15 22:15:14 2012
@@ -19,10 +19,12 @@ package kafka.tools
import joptsimple._
import kafka.utils._
+import kafka.producer.ProducerConfig
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
+import java.util.Properties
import scala.collection.JavaConversions._
/**
@@ -194,7 +196,9 @@ object SimpleConsumerShell extends Loggi
offset = messageAndOffset.nextOffset
if(printOffsets)
System.out.println("next offset = " + offset)
- formatter.writeTo(messageAndOffset.message, System.out)
+ val message = messageAndOffset.message
+ val key = if(message.hasKey) Utils.readBytes(message.key) else null
+ formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
} catch {
case e =>
if (skipMessageOnError)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Nov 15 22:15:14 2012
@@ -133,19 +133,25 @@ object Utils extends Logging {
})
thread
}
+
+ /**
+ * Read the given byte buffer into a byte array
+ */
+ def readBytes(buffer: ByteBuffer): Array[Byte] = readBytes(buffer, 0, buffer.limit)
/**
* Read a byte array from the given offset and size in the buffer
- * TODO: Should use System.arraycopy
*/
def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = {
- val bytes = new Array[Byte](size)
- var i = 0
- while(i < size) {
- bytes(i) = buffer.get(offset + i)
- i += 1
+ val dest = new Array[Byte](size)
+ if(buffer.hasArray) {
+ System.arraycopy(buffer.array, buffer.arrayOffset() + offset, dest, 0, size)
+ } else {
+ buffer.mark()
+ buffer.get(dest)
+ buffer.reset()
}
- bytes
+ dest
}
/**
@@ -204,7 +210,7 @@ object Utils extends Logging {
* @param buffer The buffer to translate
* @param encoding The encoding to use in translating bytes to characters
*/
- def readString(buffer: ByteBuffer, encoding: String): String = {
+ def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = {
val bytes = new Array[Byte](buffer.remaining)
buffer.get(bytes)
new String(bytes, encoding)
@@ -446,16 +452,10 @@ object Utils extends Logging {
/**
* Create an instance of the class with the given class name
*/
- def createObject[T<:AnyRef](className: String): T = {
- className match {
- case null => null.asInstanceOf[T]
- case _ =>
- val clazz = Class.forName(className)
- val clazzT = clazz.asInstanceOf[Class[T]]
- val constructors = clazzT.getConstructors
- require(constructors.length == 1)
- constructors.head.newInstance().asInstanceOf[T]
- }
+ def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
+ val klass = Class.forName(className).asInstanceOf[Class[T]]
+ val constructor = klass.getConstructor(args.map(_.getClass): _*)
+ constructor.newInstance(args: _*).asInstanceOf[T]
}
/**
Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala Thu Nov 15 22:15:14 2012
@@ -55,7 +55,7 @@ object TestEndToEndLatency {
var totalTime = 0.0
for(i <- 0 until numMessages) {
var begin = System.nanoTime
- producer.send(new ProducerData(topic, message))
+ producer.send(new KeyedMessage(topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar