You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/19 07:45:59 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r427095614



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -769,20 +813,25 @@ class GroupCoordinator(val brokerId: Int,
             // on heartbeat response to eventually notify the rebalance in progress signal to the consumer
             val member = group.get(memberId)
             completeAndScheduleNextHeartbeatExpiration(group, member)
-            groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+            partitionsToComplete ++= groupManager.storeOffsets(
+              group = group,
+              consumerId = memberId,
+              offsetMetadata = offsetMetadata,
+              responseCallback = responseCallback,
+              completeDelayedRequests = false)
 
           case CompletingRebalance =>
             // We should not receive a commit request if the group has not completed rebalance;
             // but since the consumer's member.id and generation is valid, it means it has received
             // the latest group generation information from the JoinResponse.
             // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
             responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })
-
           case _ =>
             throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
         }
       }
     }
+    completeDelayedRequests(partitionsToComplete)

Review comment:
       @hachikuji  thanks for reviews!
   
   > Async completion: hachikuji@3bee4ac
   
   we are on the same page :) (my previous comment https://github.com/apache/kafka/pull/6915#issuecomment-626292057)
   
   > Only posting in case it's useful to see how some of these alternatives might have looked. I'm ok with the approach here, but I do wish we could come up with a simpler API. One thought I had is whether we could make the need for external completion more explicit. For example, maybe appendRecords could return some kind of object which encapsulates purgatory completion.
   
   this style LGTM :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org