You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/04 04:08:49 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

guozhangwang commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742513843



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
+    private boolean maybeAutoCommitOffsetsAsync() {

Review comment:
       It's a bit awkward to modify `onJoinPrepareAsyncCommitFuture` inside the `maybeAutoCommitOffsetsAsync` function since the function name itself indicate a general purpose, but specifically for join-prepare --- though I understand today it is indeed only used for that caller.
   
   How about letting the `maybeAutoCommitOffsetsAsync` to return the future instead of the boolean, and then let the caller a.k.a. the `onJoinPrepare` today to check if the future is completed or not.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -420,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 // need to set the flag before calling onJoinPrepare since the user callback may throw
                 // exception, in which case upon retry we should not retry onJoinPrepare either.
                 needsJoinPrepare = false;
-                onJoinPrepare(generation.generationId, generation.memberId);
+                if (!onJoinPrepare(generation.generationId, generation.memberId))

Review comment:
       This logic is a bit complicated now, thinking about it twice I feel it may read simpler as this:
   
   ```
   if (needsJoinPrepare) {
       try {
           if (onJoinPrepare(generation.generationId, generation.memberId))
               needsJoinPrepare = false;
           else
               return false;
       } finally {
               // if onJoinPrepare throws an exception, it would be from the rebalance listener.
               // next time we would then not retry {@code onJoinPrepare} any more but proceed the join-group procedure.
               needsJoinPrepare = false;
       }
   }
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
+    private boolean maybeAutoCommitOffsetsAsync() {
+        if (autoCommitEnabled) {
+            invokeCompletedOffsetCommitCallbacks();

Review comment:
       Why invoke the callbacks twice instead of only once below?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        // commit offsets prior to rebalance if auto-commit enabled
-        maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));

Review comment:
       I think upon close(), we can also use `maybeAutoCommitOffsetsAsync` and then we can remove the whole function fo `maybeAutoCommitOffsetsSync`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1054,11 +1067,11 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
-    private void doAutoCommitOffsetsAsync() {

Review comment:
       nit: I know this is not introduced by this PR, but could we rename it to `autoCommitOffsetsAsync` instead?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
+    private boolean maybeAutoCommitOffsetsAsync() {
+        if (autoCommitEnabled) {
+            invokeCompletedOffsetCommitCallbacks();
+
+            if (onJoinPrepareAsyncCommitFuture == null)

Review comment:
       I cannot understand this logic clearly.. my original thought is that, we do not reference the `onJoinPrepareAsyncCommitFuture` here at all, just create a future and return to the caller.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
+    private boolean maybeAutoCommitOffsetsAsync() {
+        if (autoCommitEnabled) {
+            invokeCompletedOffsetCommitCallbacks();
+
+            if (onJoinPrepareAsyncCommitFuture == null)
+                onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync();
+            if (onJoinPrepareAsyncCommitFuture == null)
+                return true;
+
+            client.pollNoWakeup();
+            invokeCompletedOffsetCommitCallbacks();
+
+            if (!onJoinPrepareAsyncCommitFuture.isDone())
+                return false;
+            if (onJoinPrepareAsyncCommitFuture.succeeded()) {
+                onJoinPrepareAsyncCommitFuture = null;
+                return true;
+            }
+            if (onJoinPrepareAsyncCommitFuture.failed() && !onJoinPrepareAsyncCommitFuture.isRetriable())

Review comment:
       I think we can move this check to the caller (onJoinPrepare).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org