You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/21 22:05:55 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

guozhangwang commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r773470459



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,22 +1084,14 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
-    private void maybeAutoCommitOffsetsSync(Timer timer) {
+    private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {

Review comment:
       I think these two overloaded `maybeAutoCommitOffsetsAsync` functions can be merged: compared with the other that returns `void`, this function did the following more:
   
   1) `client.pollNoWakeup();` -> this has already been called inside the `commitOffsetsAsync` callee at https://github.com/apache/kafka/pull/11340/files#diff-0029e982555d1fae10943b862924da962ca8e247a3070cded92c5f5a5960244fR982, so this is not needed.
   
   2) `invokeCompletedOffsetCommitCallbacks();` -> I think this is not required in the async call.
   
   As a result, we can just leave one `maybeAutoCommitOffsetsAsync` function that returns the future.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -692,10 +692,17 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        // commit offsets prior to rebalance if auto-commit enabled
-        maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        boolean onJoinPrepareAsyncCommitCompleted = false;
+        // async commit offsets prior to rebalance if auto-commit enabled
+        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // return true when
+        // 1. future is null, which means no commit request sent, so it is still considered completed
+        // 2. offset commit completed
+        // 3. offset commit failed with non-retriable error
+        if (future == null || future.succeeded() || (future.failed() && !future.isRetriable()))

Review comment:
       For case 3, we should log an ERROR entry indicating that we are going to proceed with commit failed still and it may result in duplicated data being consumed, and for such cases we should notify the user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org