You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/04 06:23:15 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r742541638



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        // commit offsets prior to rebalance if auto-commit enabled
-        maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        boolean onJoinPrepareAsyncCommitSucceeded;
+        try {
+            // async commit offsets prior to rebalance if auto-commit enabled
+            onJoinPrepareAsyncCommitSucceeded = maybeAutoCommitOffsetsAsync();
+        } catch (Exception e) {
+            onJoinPrepareAsyncCommitFuture = null;
+            onJoinPrepareAsyncCommitSucceeded = true;

Review comment:
       why does this exception case seem as `CommitSucceeded = true`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1053,6 +1058,49 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {

Review comment:
       Nice test to simulate the scenario you faced.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
+    private boolean maybeAutoCommitOffsetsAsync() {

Review comment:
       There's already another method called `maybeAutoCommitOffsetsAsync`, but with `void` return type. Are you sure the compiling succeeded?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -403,7 +404,7 @@ private void closeHeartbeatThread() {
      *
      * @param timer Timer bounding how long this method can block
      * @throws KafkaException if the callback throws exception
-     * @return true iff the operation succeeded
+     * @return true if the operation succeeded

Review comment:
       why this change?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -151,6 +151,11 @@
             put(topic2, 1);
         }
     });
+    private MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {

Review comment:
       Since this variable only used in the new added test, could we put it into the test?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,6 +1086,33 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
+    private boolean maybeAutoCommitOffsetsAsync() {
+        if (autoCommitEnabled) {
+            invokeCompletedOffsetCommitCallbacks();
+
+            if (onJoinPrepareAsyncCommitFuture == null)
+                onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync();
+            if (onJoinPrepareAsyncCommitFuture == null)
+                return true;
+
+            client.pollNoWakeup();
+            invokeCompletedOffsetCommitCallbacks();
+
+            if (!onJoinPrepareAsyncCommitFuture.isDone())
+                return false;
+            if (onJoinPrepareAsyncCommitFuture.succeeded()) {
+                onJoinPrepareAsyncCommitFuture = null;
+                return true;
+            }
+            if (onJoinPrepareAsyncCommitFuture.failed() && !onJoinPrepareAsyncCommitFuture.isRetriable())

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org