You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/27 02:36:22 UTC

git commit: KAFKA-671 DelayedProduce requests should not hold full producer request data; reviewed by Neha Narkhede, Jun Rao and Jay Kreps

Updated Branches:
  refs/heads/0.8 4f752cab1 -> 89622c8e8


KAFKA-671 DelayedProduce requests should not hold full producer request data; reviewed by Neha Narkhede, Jun Rao and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89622c8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89622c8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89622c8e

Branch: refs/heads/0.8
Commit: 89622c8e88ddddeddb8c3449ef1782c1d399c2b0
Parents: 4f752ca
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Feb 26 17:36:06 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 26 17:36:15 2013 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/ProducerRequest.scala |   13 +++++++---
 .../main/scala/kafka/network/RequestChannel.scala  |    6 ++--
 .../kafka/producer/async/DefaultEventHandler.scala |    8 +++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   18 ++++++++------
 .../api/RequestResponseSerializationTest.scala     |    2 +-
 .../unit/kafka/network/SocketServerTest.scala      |   10 +++++---
 .../unit/kafka/producer/SyncProducerTest.scala     |    5 ++-
 7 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 72b2cba..916fb59 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -49,7 +49,7 @@ object ProducerRequest {
       })
     })
 
-    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
+    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, collection.mutable.Map(partitionDataPairs:_*))
   }
 }
 
@@ -58,19 +58,20 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
-                           data: Map[TopicAndPartition, ByteBufferMessageSet])
+                           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
    */
   private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
+  val topicPartitionMessageSizeMap = data.map(r => r._1 -> r._2.sizeInBytes).toMap
 
   def this(correlationId: Int,
            clientId: String,
            requiredAcks: Short,
            ackTimeoutMs: Int,
-           data: Map[TopicAndPartition, ByteBufferMessageSet]) =
+           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) {
@@ -130,7 +131,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     producerRequest.append("; ClientId: " + clientId)
     producerRequest.append("; RequiredAcks: " + requiredAcks)
     producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(","))
+    producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
     producerRequest.toString()
   }
 
@@ -142,5 +143,9 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
+
+  def emptyData(){
+    data.clear()
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 931092d..209fdfa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -32,7 +32,7 @@ object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
-    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]())
+    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
     byteBuffer.putShort(RequestKeys.ProduceKey)
     emptyProducerRequest.writeTo(byteBuffer)
@@ -40,13 +40,13 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
+  case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
     @volatile var dequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
-    buffer.rewind()
+    buffer = null
     trace("Received request : %s".format(requestObj))
 
     def updateRequestMetrics() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index dab1b9c..27b16e3 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -142,8 +142,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     serializedMessages
   }
 
-  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
-    val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
+  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
+    val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
     try {
       for (message <- messages) {
         val topicPartitionsList = getPartitionListForTopic(message)
@@ -227,7 +227,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
    * @return the set (topic, partitions) messages which incurred an error sending or processing
    */
-  private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
+  private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
     if(brokerId < 0) {
       warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
       messagesPerTopic.keys.toSeq
@@ -270,7 +270,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }
   }
 
-  private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
     /** enforce the compressed.topics config here.
       *  If the compression codec is anything other than NoCompressionCodec,
       *    Enable compression only for specified topics if any

http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5351e7c..cfabfc1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,8 +99,8 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet) {
-    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messages)
+  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
     trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
 
     // send any newly unblocked responses
@@ -122,7 +122,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
     produceRequest.data.foreach(partitionAndData =>
-      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2))
+      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
 
     val allPartitionHaveReplicationFactorOne =
       !produceRequest.data.keySet.exists(
@@ -162,6 +162,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug(satisfiedProduceRequests.size +
         " producer requests unblocked during produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
+      // we do not need the data anymore
+      produceRequest.emptyData()
     }
   }
   
@@ -438,14 +440,14 @@ class KafkaApis(val requestChannel: RequestChannel,
    * A holding pen for fetch requests waiting to be satisfied
    */
   class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
-          extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
+          extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = {
-      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes)
+    def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
+      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
 
@@ -546,8 +548,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchPartitionStatus.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
-          val messages = produce.data(followerFetchRequestKey.topicAndPartition)
-          maybeUnblockDelayedFetchRequests(topic, partitionId, messages)
+          val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
+          maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 26f31ec..d0c7b90 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -61,7 +61,7 @@ object SerializationTestUtils{
         case(partitionDataMessage, partition) =>
           (TopicAndPartition(topic, partition), partitionDataMessage)
       })
-    collection.immutable.Map(groupedData:_*)
+    collection.mutable.Map(groupedData:_*)
   }
 
   private val requestInfos = collection.immutable.Map(

http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9322b2c..b347e66 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -60,8 +60,10 @@ class SocketServerTest extends JUnitSuite {
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
     val request = channel.receiveRequest
-    val id = request.buffer.getShort
-    val send = new BoundedByteBufferSend(request.buffer.slice)
+    val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes)
+    request.requestObj.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    val send = new BoundedByteBufferSend(byteBuffer)
     channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
 
@@ -80,7 +82,7 @@ class SocketServerTest extends JUnitSuite {
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
     val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
 
     val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
     emptyRequest.writeTo(byteBuffer)
@@ -110,7 +112,7 @@ class SocketServerTest extends JUnitSuite {
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack: Short = 0
     val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
 
     val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
     emptyRequest.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/89622c8e/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 81b2736..b5ee31d 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -78,7 +78,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack: Short = 1
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
+
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
     Assert.assertTrue(response != null)
@@ -192,7 +193,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack: Short = 0
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
     Assert.assertTrue(response == null)