You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 21:43:38 UTC

kafka git commit: KAFKA-5250: Do fetch down conversion after throttling

Repository: kafka
Updated Branches:
  refs/heads/trunk 65edd64ca -> 96959bc56


KAFKA-5250: Do fetch down conversion after throttling

Perform down conversion after throttling to avoid retaining
messages in memory during throttling since this could result
in OOM. Also update bytesOut metrics after throttling.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3068 from rajinisivaram/KAFKA-5250


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96959bc5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96959bc5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96959bc5

Branch: refs/heads/trunk
Commit: 96959bc5620b6dc5900173a15cccfe83d20be944
Parents: 65edd64
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue May 16 08:49:21 2017 -0400
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 22:32:18 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 87 ++++++++++++--------
 1 file changed, 51 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96959bc5/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 02a1103..db15f72 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -492,37 +492,41 @@ class KafkaApis(val requestChannel: RequestChannel,
         FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
-    // the callback for sending a fetch response
-    def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
-      val convertedPartitionData = {
-        responsePartitionData.map { case (tp, data) =>
-
-          // Down-conversion of the fetched records is needed when the stored magic version is
-          // greater than that supported by the client (as indicated by the fetch request version). If the
-          // configured magic version for the topic is less than or equal to that supported by the version of the
-          // fetch request, we skip the iteration through the records in order to check the magic version since we
-          // know it must be supported. However, if the magic version is changed from a higher version back to a
-          // lower version, this check will no longer be valid and we will fail to down-convert the messages
-          // which were written in the new format prior to the version downgrade.
-          val convertedData = replicaManager.getMagic(tp) match {
-            case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
-              trace(s"Down converting message to V0 for fetch request from $clientId")
-              FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
-
-            case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
-              trace(s"Down converting message to V1 for fetch request from $clientId")
-              FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
-
-            case _ => data
-          }
+    def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
+
+      // Down-conversion of the fetched records is needed when the stored magic version is
+      // greater than that supported by the client (as indicated by the fetch request version). If the
+      // configured magic version for the topic is less than or equal to that supported by the version of the
+      // fetch request, we skip the iteration through the records in order to check the magic version since we
+      // know it must be supported. However, if the magic version is changed from a higher version back to a
+      // lower version, this check will no longer be valid and we will fail to down-convert the messages
+      // which were written in the new format prior to the version downgrade.
+      replicaManager.getMagic(tp) match {
+        case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
+          trace(s"Down converting message to V0 for fetch request from $clientId")
+          new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+              data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+
+        case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
+          trace(s"Down converting message to V1 for fetch request from $clientId")
+          new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+              data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+
+        case _ => data
+      }
+    }
 
-          val abortedTransactions = convertedData.abortedTransactions.map(_.asJava).orNull
-          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            convertedData.logStartOffset, abortedTransactions, convertedData.records)
+    // the callback for process a fetch response, invoked before throttling
+    def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
+      val partitionData = {
+        responsePartitionData.map { case (tp, data) =>
+          val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
+          tp -> new FetchResponse.PartitionData(data.error, data.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            data.logStartOffset, abortedTransactions, data.records)
         }
       }
 
-      val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
+      val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
 
       val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
 
@@ -532,17 +536,22 @@ class KafkaApis(val requestChannel: RequestChannel,
             s"on partition $topicPartition failed due to ${data.error.exceptionName}")
 
         fetchedPartitionData.put(topicPartition, data)
-
-        // record the bytes out metrics only when the response is being sent
-        brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
       }
 
-      val response = new FetchResponse(fetchedPartitionData, 0)
-      val responseStruct = response.toStruct(versionId)
-
+      // fetch response callback invoked after any throttling
       def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
         def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
+          val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+          fetchedPartitionData.asScala.foreach(e => convertedData.put(e._1, convertedPartitionData(e._1, e._2)))
+          val response = new FetchResponse(convertedData, 0)
+          val responseStruct = response.toStruct(versionId)
+
           trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
+          response.responseData.asScala.foreach { case (topicPartition, data) =>
+            // record the bytes out metrics only when the response is being sent
+            brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
+          }
+
           val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
           new RequestChannel.Response(request, responseSend)
         }
@@ -555,7 +564,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
       }
 
-      // When this callback is triggered, the remote API call has completed
+      // When this callback is triggered, the remote API call has completed.
+      // Record time before any byte-rate throttling.
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       if (fetchRequest.isFromFollower) {
@@ -564,13 +574,18 @@ class KafkaApis(val requestChannel: RequestChannel,
         quotas.leader.record(responseSize)
         fetchResponseCallback(bandwidthThrottleTimeMs = 0)
       } else {
+        // Fetch size used to determine throttle time is calculated before any down conversions.
+        // This may be slightly different from the actual response size. But since down conversions
+        // result in data being loaded into memory, it is better to do this after throttling to avoid OOM.
+        val response = new FetchResponse(fetchedPartitionData, 0)
+        val responseStruct = response.toStruct(versionId)
         quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
           fetchResponseCallback)
       }
     }
 
     if (authorizedRequestInfo.isEmpty)
-      sendResponseCallback(Seq.empty)
+      processResponseCallback(Seq.empty)
     else {
       // call the replica manager to fetch messages from the local replica
       replicaManager.fetchMessages(
@@ -581,7 +596,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         versionId <= 2,
         authorizedRequestInfo,
         replicationQuota(fetchRequest),
-        sendResponseCallback,
+        processResponseCallback,
         fetchRequest.isolationLevel)
     }
   }