You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC
[28/30] git commit: ProducerRequest should take ByteBufferMessageSet
instead of MessageSet; patched by Jun Rao; reviewed by Neha Narkhede;
KAFKA-632
ProducerRequest should take ByteBufferMessageSet instead of MessageSet; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-632
git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1414917 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdb04bba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdb04bba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdb04bba
Branch: refs/heads/trunk
Commit: bdb04bba67326bdf1451df3ec4a855ed3c109869
Parents: 961affa
Author: Jun Rao <ju...@apache.org>
Authored: Wed Nov 28 21:05:17 2012 +0000
Committer: Jun Rao <ju...@apache.org>
Committed: Wed Nov 28 21:05:17 2012 +0000
----------------------------------------------------------------------
.../src/main/scala/kafka/api/ProducerRequest.scala | 6 +++---
.../kafka/producer/async/DefaultEventHandler.scala | 6 +-----
.../unit/kafka/producer/SyncProducerTest.scala | 2 +-
3 files changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdb04bba/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 1713dee..87700a0 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -57,7 +57,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
- data: Map[TopicAndPartition, MessageSet])
+ data: Map[TopicAndPartition, ByteBufferMessageSet])
extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
/**
@@ -69,7 +69,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
- data: Map[TopicAndPartition, MessageSet]) =
+ data: Map[TopicAndPartition, ByteBufferMessageSet]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
def writeTo(buffer: ByteBuffer) {
@@ -88,7 +88,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
topicAndPartitionData.foreach(partitionAndData => {
val partition = partitionAndData._1.partition
val partitionMessageData = partitionAndData._2
- val bytes = partitionMessageData.asInstanceOf[ByteBufferMessageSet].buffer
+ val bytes = partitionMessageData.buffer
buffer.putInt(partition)
buffer.putInt(bytes.limit)
buffer.put(bytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdb04bba/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index a84be2d..9be87d0 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -210,12 +210,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
- val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
- case (topicAndPartition, messages) =>
- (topicAndPartition, messages)
- }
val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
- config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
+ config.requestTimeoutMs, messagesPerTopic)
try {
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdb04bba/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index ae34315..744554c 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -85,7 +85,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
- val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, MessageSet]())
+ val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest)