You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/05 05:49:11 UTC

[GitHub] [kafka] chia7712 opened a new pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

chia7712 opened a new pull request #10269:
URL: https://github.com/apache/kafka/pull/10269


   The fetch data generated by `KafkaApis` is re-grouped when it is converted to `FetchResponse`. That is unnecessary since `KafkaApis` can keep a grouped collection for fetch data. The other main changes are shown below.
   
   1. remove `FetchResponse#of`
   1. remove useless constructor from `FetchResponse`
   1. remove `PartitionIterator`
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588075100



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()
+          .setErrorCode(unconvertedFetchResponse.error.code)
+          .setThrottleTimeMs(throttleTimeMs)
+          .setSessionId(unconvertedFetchResponse.sessionId)
+          .setResponses(new util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+        unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData =>
+          val convertedTopicResponse = new FetchResponseData.FetchableTopicResponse()
+            .setTopic(unconvertedTopicData.topic)
+            .setPartitions(new util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+          convertedResponse.responses.add(convertedTopicResponse)
+          unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+            val tp = new TopicPartition(unconvertedTopicData.topic, unconvertedPartitionData.partitionIndex)
+            val error = Errors.forCode(unconvertedPartitionData.errorCode)
+            if (error != Errors.NONE)

Review comment:
       Would it make sense to replace this by 'errorCode != Errors.NONE.code'?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()

Review comment:
       Could we mutate the response instead of creating a new one?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()
+          .setErrorCode(unconvertedFetchResponse.error.code)
+          .setThrottleTimeMs(throttleTimeMs)
+          .setSessionId(unconvertedFetchResponse.sessionId)
+          .setResponses(new util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+        unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData =>
+          val convertedTopicResponse = new FetchResponseData.FetchableTopicResponse()
+            .setTopic(unconvertedTopicData.topic)
+            .setPartitions(new util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+          convertedResponse.responses.add(convertedTopicResponse)
+          unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+            val tp = new TopicPartition(unconvertedTopicData.topic, unconvertedPartitionData.partitionIndex)

Review comment:
       It's a pity that we have te recreate TopicPartition object here. We have to do the same in couple of other places... I am not sure about the impact though. What's your take on this?
   
   I wonder if we could cache it in the response. We could extend the automated protocol to support "transient fields" (a field which is declared but not serialized) and store it there.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }

Review comment:
       forKeyValue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588094923



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()
+          .setErrorCode(unconvertedFetchResponse.error.code)
+          .setThrottleTimeMs(throttleTimeMs)
+          .setSessionId(unconvertedFetchResponse.sessionId)
+          .setResponses(new util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+        unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData =>
+          val convertedTopicResponse = new FetchResponseData.FetchableTopicResponse()
+            .setTopic(unconvertedTopicData.topic)
+            .setPartitions(new util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+          convertedResponse.responses.add(convertedTopicResponse)
+          unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+            val tp = new TopicPartition(unconvertedTopicData.topic, unconvertedPartitionData.partitionIndex)

Review comment:
       yep, we made many copies when processing fetch request.
   
   1. merge responsePartitionData with errors
   1. down-convert fetch data 
   1. regroup data
   
   This patch want to fix 3. and others are traced by https://issues.apache.org/jira/browse/KAFKA-12387
   
   At any rate, I will take a look at other cases if it does not bring a big patch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588138822



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588090869



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()
+          .setErrorCode(unconvertedFetchResponse.error.code)
+          .setThrottleTimeMs(throttleTimeMs)
+          .setSessionId(unconvertedFetchResponse.sessionId)
+          .setResponses(new util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+        unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData =>
+          val convertedTopicResponse = new FetchResponseData.FetchableTopicResponse()
+            .setTopic(unconvertedTopicData.topic)
+            .setPartitions(new util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+          convertedResponse.responses.add(convertedTopicResponse)
+          unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+            val tp = new TopicPartition(unconvertedTopicData.topic, unconvertedPartitionData.partitionIndex)
+            val error = Errors.forCode(unconvertedPartitionData.errorCode)
+            if (error != Errors.NONE)

Review comment:
       the `error` is used by logging also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-792943140


   > Thanks for the PR. Can you check the perf impact of these changes?
   
   sure. will add benchmark results tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-791199822


   Nice PR! I will take a look at it on Monday.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-801187849


   @chia7712 I'm rewriting https://github.com/apache/kafka/pull/9944 to use the autogenerated structures based on this PR. Just pushed a version that simplifies the unresolved topic ID handling. I tried to make it easier to build the fetch response using the data object. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588093454



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }

Review comment:
       ah right. misread it, sorry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-802975219


   >sounds good. If the new approach is very different from #9944, please open a new PR in order to compare them :)
   
   @chia7712 I've updated the code. I think this is the direction we want so I didn't open a PR.
   Biggest changes to the structure are in these commits:
   https://github.com/apache/kafka/pull/9944/commits/5a0a6d6a9e34a77e7da8b2b320b52fdc589c2e8d
   https://github.com/apache/kafka/pull/9944/commits/a0b2bc96b125d08cb4b9d47c6826e60a85cae968
   https://github.com/apache/kafka/pull/9944/commits/ace90b07d46a596b0d630b4ff8821a81eb1a01e4
   
   The idea is that FetchSession can now generate a list of the unresolvedTopics' `FetchResponseData.FetchableTopicResponse`. Hopefully from there, it is not too difficult to combine with your approach here.
   
    But let me know if it's hard to read. I can open a new one and revert the changes on the old. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-794226627


   @chia7712 I was thinking about the jmh microbenchmarks that stress fetch, fetch session and so on.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588049196



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -411,54 +436,40 @@ class IncrementalFetchContext(private val time: Time,
     }
   }
 
-  // Iterator that goes over the given partition map and selects partitions that need to be included in the response.
-  // If updateFetchContextAndRemoveUnselected is set to true, the fetch context will be updated for the selected
-  // partitions and also remove unselected ones as they are encountered.
-  private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,

Review comment:
       The `iterator` is unnecessary since we have to generate `list` collection in order to calculate message size.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r589635348



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3368,8 +3396,16 @@ object KafkaApis {
   private[server] def sizeOfThrottledPartitions(versionId: Short,
                                                 unconvertedResponse: FetchResponse,
                                                 quota: ReplicationQuotaManager): Int = {
-    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
-      .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava)
+    val topicResponses = new util.ArrayList[FetchResponseData.FetchableTopicResponse]()

Review comment:
       I see, I guess this changed once we moved to the auto generated fetch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-801197578


   > I'm rewriting #9944 to use the autogenerated structures based on this PR. Just pushed a version that simplifies the unresolved topic ID handling. I tried to make it easier to build the fetch response using the data object. Going to try to build the response using the data object in most places today and I can push that version as soon as I can.
   
   sounds good. If the new approach is very different from #9944, please open a new PR in order to compare them :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-794221218


   @ijuma The results of performance tests are attached. It does not show obvious performance regression. Will run more tests tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588050163



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -795,7 +795,17 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for process a fetch response, invoked before throttling
     def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
-      val partitions = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
+      val topicResponses = new util.ArrayList[FetchResponseData.FetchableTopicResponse]()

Review comment:
       This is the main purpose of this PR. `KafkaApis` keeps grouped data.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588091547



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }

Review comment:
       `erroneous` is not map type so we can't use `forKeyValue`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-791487213


   Thanks for the PR. Can you check the perf impact of these changes?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-801187849


   @chia7712 I'm rewriting https://github.com/apache/kafka/pull/9944 to use the autogenerated structures based on this PR. Just pushed a version that simplifies the unresolved topic ID handling. I tried to make it easier to build the fetch response using the data object. Going to try to build the response using the data object in most places today and I can push that version as soon as I can.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r589631664



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3368,8 +3396,16 @@ object KafkaApis {
   private[server] def sizeOfThrottledPartitions(versionId: Short,
                                                 unconvertedResponse: FetchResponse,
                                                 quota: ReplicationQuotaManager): Int = {
-    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
-      .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava)
+    val topicResponses = new util.ArrayList[FetchResponseData.FetchableTopicResponse]()

Review comment:
       Do we have to create a new collection? The previous version did not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-794237954


   > I was thinking about the jmh microbenchmarks that stress fetch, fetch session and so on.
   
   will copy that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588095220



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()
+          .setErrorCode(unconvertedFetchResponse.error.code)
+          .setThrottleTimeMs(throttleTimeMs)
+          .setSessionId(unconvertedFetchResponse.sessionId)
+          .setResponses(new util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+        unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData =>
+          val convertedTopicResponse = new FetchResponseData.FetchableTopicResponse()
+            .setTopic(unconvertedTopicData.topic)
+            .setPartitions(new util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+          convertedResponse.responses.add(convertedTopicResponse)
+          unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+            val tp = new TopicPartition(unconvertedTopicData.topic, unconvertedPartitionData.partitionIndex)
+            val error = Errors.forCode(unconvertedPartitionData.errorCode)
+            if (error != Errors.NONE)

Review comment:
       make sense. will copy that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-800648300


   @chia7712 is there any work in progress for a KafkaApis.handleFetchRequest test? I suspect it would be similar but maybe a bit harder than what I did for the LeaderAndIsr version https://github.com/apache/kafka/pull/10071 (trading replicamanager for fetchmanager, etc). This benchmark would be helpful for https://github.com/apache/kafka/pull/9944 as you could probably guess :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588049874



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -294,14 +294,24 @@ trait FetchContext extends Logging {
     */
   def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse
 
-  def partitionsToLogString(partitions: util.Collection[TopicPartition]): String =
-    FetchSession.partitionsToLogString(partitions, isTraceEnabled)
+  def partitionsToLogString(topics: FetchSession.RESP_MAP): String = {

Review comment:
       this method is used to log (DEBUG level) so it should be fine to iterate through whole collection.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-800745691


   > is there any work in progress for a KafkaApis.handleFetchRequest test? I suspect it would be similar but maybe a bit harder than what I did for the LeaderAndIsr version #10071 (trading replicamanager for fetchmanager, etc). This benchmark would be helpful for #9944 as you could probably guess :)
   
   this PR is blocked by #9944. This PR (and other related issues) aim to remove all extra collection creation by using auto-generated data. In #9944 we have to create a lot of collections to handle the topic id in fetch request. Hence, I need to rethink the value (and approach) of this PR :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-795066426


   @ijuma the JMH result of fetch session is attached. I tried to have a JMH for stress fetch. However, `KafkaApis.handleFetchRequest` is hard to be a JMH. It requires a lot of changes ...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r589633902



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3368,8 +3396,16 @@ object KafkaApis {
   private[server] def sizeOfThrottledPartitions(versionId: Short,
                                                 unconvertedResponse: FetchResponse,
                                                 quota: ReplicationQuotaManager): Int = {
-    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
-      .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava)
+    val topicResponses = new util.ArrayList[FetchResponseData.FetchableTopicResponse]()

Review comment:
       > The previous version did not.
   
   It created copy before since we have to add a collection to auto-generated data to calculate the size. see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java#L133




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588094059



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setRecords(data.records)
           .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
         data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-        partitions.put(tp, partitionData)
+        addPartition(tp.topic, partitionData)
       }
-      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-      var unconvertedFetchResponse: FetchResponse = null
+      erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-      def createResponse(throttleTimeMs: Int): FetchResponse = {
+      def createResponse(unconvertedFetchResponse: FetchResponse, throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
-        val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        val convertedResponse = new FetchResponseData()
+          .setErrorCode(unconvertedFetchResponse.error.code)
+          .setThrottleTimeMs(throttleTimeMs)
+          .setSessionId(unconvertedFetchResponse.sessionId)
+          .setResponses(new util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+        unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData =>
+          val convertedTopicResponse = new FetchResponseData.FetchableTopicResponse()
+            .setTopic(unconvertedTopicData.topic)
+            .setPartitions(new util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+          convertedResponse.responses.add(convertedTopicResponse)
+          unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+            val tp = new TopicPartition(unconvertedTopicData.topic, unconvertedPartitionData.partitionIndex)
+            val error = Errors.forCode(unconvertedPartitionData.errorCode)
+            if (error != Errors.NONE)

Review comment:
       Right. I was thinking that we could convert it to Errors only if we need it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org