You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/28 07:08:33 UTC

[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908114549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   If `client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));` is still called here, the problem will recur, and the user will still block the `poll` method of kafkaConsumer. ` rebalanceConfig.rebalanceTimeoutMs` may be much larger than `pollDuration`
   
   Suggest:
   1. Promote the `future` variable in `onJoinPrepare` to the instance variable of `ConsumerCoordinator`. The variable name can be tentatively `rebalanceAutoCommitFuture`, and the initial value is `null`.
       ` private RequestFuture<Void> rebalanceAutoCommitFuture =null;`
   
   2. Refactor the `onJoinPrepare` method. The `rebalanceAutoCommitFuture` can be completed after the user has called the `poll` method multiple times without blocking the user's `poll` method.
           ```
           boolean onJoinPrepareAsyncCommitCompleted = false;
           if(autoCommitEnabled && rebalanceAutoCommitFuture == null){
               // async commit offsets prior to rebalance if auto-commit enabled
               rebalanceAutoCommitFuture = maybeAutoCommitOffsetsAsync();
           }
           if (rebalanceAutoCommitFuture != null) {
               client.poll(rebalanceAutoCommitFuture, time.timer(0));
           }
           // return true when
           // 1. future is null, which means no commit request sent, so it is still considered completed
           // 2. offset commit completed
           // 3. offset commit failed with non-retriable exception
           if (rebalanceAutoCommitFuture == null)
               onJoinPrepareAsyncCommitCompleted = true;
           else if (rebalanceAutoCommitFuture.succeeded()) {
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           } else if (rebalanceAutoCommitFuture.failed() && !rebalanceAutoCommitFuture.isRetriable()) {
               log.error("Asynchronous auto-commit of offsets failed: {}", rebalanceAutoCommitFuture.exception().getMessage());
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org