You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/27 02:36:22 UTC
git commit: KAFKA-671 DelayedProduce requests should not hold full
producer request data; reviewed by Neha Narkhede, Jun Rao and Jay Kreps
Updated Branches:
refs/heads/0.8 4f752cab1 -> 89622c8e8
KAFKA-671 DelayedProduce requests should not hold full producer request data; reviewed by Neha Narkhede, Jun Rao and Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89622c8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89622c8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89622c8e
Branch: refs/heads/0.8
Commit: 89622c8e88ddddeddb8c3449ef1782c1d399c2b0
Parents: 4f752ca
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Feb 26 17:36:06 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 26 17:36:15 2013 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/api/ProducerRequest.scala | 13 +++++++---
.../main/scala/kafka/network/RequestChannel.scala | 6 ++--
.../kafka/producer/async/DefaultEventHandler.scala | 8 +++---
core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++++++++------
.../api/RequestResponseSerializationTest.scala | 2 +-
.../unit/kafka/network/SocketServerTest.scala | 10 +++++---
.../unit/kafka/producer/SyncProducerTest.scala | 5 ++-
7 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 72b2cba..916fb59 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -49,7 +49,7 @@ object ProducerRequest {
})
})
- ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
+ ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, collection.mutable.Map(partitionDataPairs:_*))
}
}
@@ -58,19 +58,20 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
- data: Map[TopicAndPartition, ByteBufferMessageSet])
+ data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
/**
* Partitions the data into a map of maps (one for each topic).
*/
private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
+ val topicPartitionMessageSizeMap = data.map(r => r._1 -> r._2.sizeInBytes).toMap
def this(correlationId: Int,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
- data: Map[TopicAndPartition, ByteBufferMessageSet]) =
+ data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
def writeTo(buffer: ByteBuffer) {
@@ -130,7 +131,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
producerRequest.append("; ClientId: " + clientId)
producerRequest.append("; RequiredAcks: " + requiredAcks)
producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
- producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(","))
+ producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
producerRequest.toString()
}
@@ -142,5 +143,9 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
+
+ def emptyData(){
+ data.clear()
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 931092d..209fdfa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -32,7 +32,7 @@ object RequestChannel extends Logging {
val AllDone = new Request(1, 2, getShutdownReceive(), 0)
def getShutdownReceive() = {
- val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]())
+ val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
byteBuffer.putShort(RequestKeys.ProduceKey)
emptyProducerRequest.writeTo(byteBuffer)
@@ -40,13 +40,13 @@ object RequestChannel extends Logging {
byteBuffer
}
- case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
+ case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
@volatile var dequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
- buffer.rewind()
+ buffer = null
trace("Received request : %s".format(requestObj))
def updateRequestMetrics() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index dab1b9c..27b16e3 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -142,8 +142,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
serializedMessages
}
- def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
- val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
+ def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
+ val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
try {
for (message <- messages) {
val topicPartitionsList = getPartitionListForTopic(message)
@@ -227,7 +227,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing
*/
- private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
+ private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
messagesPerTopic.keys.toSeq
@@ -270,7 +270,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
- private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+ private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5351e7c..cfabfc1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,8 +99,8 @@ class KafkaApis(val requestChannel: RequestChannel,
* Check if a partitionData from a produce request can unblock any
* DelayedFetch requests.
*/
- def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet) {
- val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messages)
+ def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
+ val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
// send any newly unblocked responses
@@ -122,7 +122,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
produceRequest.data.foreach(partitionAndData =>
- maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2))
+ maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
val allPartitionHaveReplicationFactorOne =
!produceRequest.data.keySet.exists(
@@ -162,6 +162,8 @@ class KafkaApis(val requestChannel: RequestChannel,
debug(satisfiedProduceRequests.size +
" producer requests unblocked during produce to local log.")
satisfiedProduceRequests.foreach(_.respond())
+ // we do not need the data anymore
+ produceRequest.emptyData()
}
}
@@ -438,14 +440,14 @@ class KafkaApis(val requestChannel: RequestChannel,
* A holding pen for fetch requests waiting to be satisfied
*/
class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
- extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
+ extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/
- def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = {
- val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes)
+ def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
+ val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
accumulatedSize >= delayedFetch.fetch.minBytes
}
@@ -546,8 +548,8 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchPartitionStatus.error = ErrorMapping.NoError
}
if (!fetchPartitionStatus.acksPending) {
- val messages = produce.data(followerFetchRequestKey.topicAndPartition)
- maybeUnblockDelayedFetchRequests(topic, partitionId, messages)
+ val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
+ maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 26f31ec..d0c7b90 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -61,7 +61,7 @@ object SerializationTestUtils{
case(partitionDataMessage, partition) =>
(TopicAndPartition(topic, partition), partitionDataMessage)
})
- collection.immutable.Map(groupedData:_*)
+ collection.mutable.Map(groupedData:_*)
}
private val requestInfos = collection.immutable.Map(
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9322b2c..b347e66 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -60,8 +60,10 @@ class SocketServerTest extends JUnitSuite {
/* A simple request handler that just echos back the response */
def processRequest(channel: RequestChannel) {
val request = channel.receiveRequest
- val id = request.buffer.getShort
- val send = new BoundedByteBufferSend(request.buffer.slice)
+ val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes)
+ request.requestObj.writeTo(byteBuffer)
+ byteBuffer.rewind()
+ val send = new BoundedByteBufferSend(byteBuffer)
channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
}
@@ -80,7 +82,7 @@ class SocketServerTest extends JUnitSuite {
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
val emptyRequest =
- new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+ new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
emptyRequest.writeTo(byteBuffer)
@@ -110,7 +112,7 @@ class SocketServerTest extends JUnitSuite {
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack: Short = 0
val emptyRequest =
- new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+ new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
emptyRequest.writeTo(byteBuffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 81b2736..b5ee31d 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -78,7 +78,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack: Short = 1
- val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+ val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
+
val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest)
Assert.assertTrue(response != null)
@@ -192,7 +193,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack: Short = 0
- val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+ val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest)
Assert.assertTrue(response == null)