You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/26 09:27:29 UTC

[GitHub] [kafka] aiquestion opened a new pull request, #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

aiquestion opened a new pull request, #12349:
URL: https://github.com/apache/kafka/pull/12349

   I think this is introduce in https://issues.apache.org/jira/browse/KAFKA-13310.  
   
   https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752
   
   we didn't wait for client to receive commit offset response here, so onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and client will loop in invoking onJoinPrepare.
   
   i think EAGER mode don't have this problem because it will revoke the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to commit next round.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189724526

   Backport to 3.3 and 3.2. cc @jsancio @mumrah 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908395237


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   @RivenSun2  Ah Yes, you are right.
   i think in previous issue https://issues.apache.org/jira/browse/KAFKA-13310, the problem is that `maybeAutoCommitOffsetsSync` falls into an infinite loop if meet `UNKNOWN_TOPIC_OR_PARTITION` error.
   But i think we need to wait for the commitOffset response and try to commit offset successfully before joinGroup, to  make 'no duplicate message' if every thing works well.
   
   Your suggestion can work, but it will cause KafkaConsuemr.poll() to be in a busy loop between the time when commitOffsetRequest is in flight.
   
   Do you think we can do:
   * still wait for the response here
   * limit the max retry time of commitOffset in onJoinPrepare function
   
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676827


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922694170


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;

Review Comment:
   yes, i think there is some bug in my previous code, fixed it. thanks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r923994624


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:
+        // 1. offset commit haven't done (and joinPrepareTime not expired)
+        // 2. failed with retryable exception (and joinPrepareTime not expired)

Review Comment:
   nit: joinPrepareTime -> joinPrepareTime[r]



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:

Review Comment:
    keep retrying the offset commit when: ->  [K]eep retrying[/waiting] the offset commit when:



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);

Review Comment:
   I think we should add a comment above L752, ex:
   `// We should complete onJoinPrepare before rebalanceTimeout, and continue to join group to avoid member got kicked out from group`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -809,11 +850,13 @@ else if (future.failed() && !future.isRetriable()) {
 
         isLeader = false;
         subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;

Review Comment:
   We should update timer before return.
   `timer.update();`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);

Review Comment:
   This retry behavior is already tested in `testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry`, right? We can remove it here, and test non-retriable exception directly. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);

Review Comment:
   1. Why should we do `time.sleep(150)` each time?
   2. There are some unnecessary codes. We can simplify them into below codes:
   
   ```java
           Timer pollTimer = time.timer(100L);
           client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
           boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
           assertFalse(res);
   
           pollTimer = time.timer(100L);
           client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
           res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
           assertTrue(res);
   ```
   
   Same comments apply to below new added tests



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithInflightCommitOffestShouldKeepJoinAfterRebalanceTimeout() {

Review Comment:
   I know why we should assert `coordinatorUnknown` at the end. Sorry, I didn't know that. So, I think we should remove this test because if the coordinator is unknown, we'll never complete the rebalance, which is not what we want to test. Please remove it. Sorry and thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithInflightCommitOffestShouldKeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(100);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(rebalanceTimeoutMs);
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        // commit offset should timeout and mark coordinator unknown
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);

Review Comment:
   duplicated test.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1251,6 +1252,83 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() {

Review Comment:
   What do we want to test in this test? Is it related to our change?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+    "org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+        }
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment,
+      s"Timed out while awaiting expected assignment change to $expectedAssignment.")
+
+    // Since the consumer1 already completed the rebalance,
+    // the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId
+    var stableGeneration = -1
+    var stableMemberId1 = ""
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      stableGeneration = generationId1
+      stableMemberId1 = memberId1
+    } finally {
+      lock.unlock()
+    }
+
+    val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, List(topic))
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 1,
+      s"Timed out while awaiting expected assignment change to 1.")

Review Comment:
   nit: Timed out while awaiting expected assignment [size] change to 1."
   Same comments applied to next line.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));

Review Comment:
   Why do we need `groupInstanceId`? Could we use class variable: `rebalanceConfig` directly here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -191,7 +191,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
      * @param memberId The identifier of this member in the previous group or "" if there was none
      * @return true If onJoinPrepare async commit succeeded, false otherwise
      */
-    protected abstract boolean onJoinPrepare(int generation, String memberId);
+    protected abstract boolean onJoinPrepare(Timer timer, int generation, String memberId);

Review Comment:
   We should also add param `timer` into javadoc above



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();

Review Comment:
   Sorry, I think we don't need to keep updating `timer`, since we don't check timer expiration within `onJoinPrepare`. We can update it once at the end of `onJoinPrepare`.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+    "org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+        }
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment,
+      s"Timed out while awaiting expected assignment change to $expectedAssignment.")
+
+    // Since the consumer1 already completed the rebalance,
+    // the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId
+    var stableGeneration = -1
+    var stableMemberId1 = ""
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      stableGeneration = generationId1
+      stableMemberId1 = memberId1
+    } finally {
+      lock.unlock()
+    }
+
+    val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, List(topic))
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 1,
+      s"Timed out while awaiting expected assignment change to 1.")
+    TestUtils.waitUntilTrue(() => consumerPoller2.consumerAssignment().size == 1,
+      s"Timed out while awaiting expected assignment change to 1.")
+
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      if (assignmentStrategy.equals(classOf[CooperativeStickyAssignor].getName)) {
+        // cooperative rebalance should rebalance twice before finally stable
+        assertEquals(stableGeneration + 2, generationId1)
+      } else {
+        // eager rebalance should rebalance once once before finally stable

Review Comment:
   nit: additional `once`: eager rebalance should rebalance once [once] before finally stable



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:
+        // 1. offset commit haven't done (and joinPrepareTime not expired)
+        // 2. failed with retryable exception (and joinPrepareTime not expired)
+        // Otherwise, continue to revoke partitions, ex:
+        // 1. if joinPrepareTime has expired
+        // 2. if offset commit failed with no-retryable exception
+        // 3. if offset commit success
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout. Will continue to join group");
+            } else if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.debug("Asynchronous auto-commit of offsets failed with retryable error: {}. Will retry it.",
+                          autoCommitOffsetRequestFuture.exception().getMessage());
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && !autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}. Will continue to join group.",
+                          autoCommitOffsetRequestFuture.exception().getMessage());
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs));

Review Comment:
   I think we should sleep with `pollTimer` here, otherwise, if `joinPrepareTimer.remainingMs()` is less than timer.remainingMs() and rebalanceConfig.retryBackoffMs, we'll expire it, right? That is:
   ```
   pollTimer.sleep(Math.min(pollTimer.remainingMs(), rebalanceConfig.retryBackoffMs));
   timer.update();
   return false;
   ```
   Also, since we won't use `joinPrepareTimer` until next time entering `onJoinPrepare`, and we already update `joinPrepareTimer` when entering `onJoinPrepare`, so we don't need to update it here. Does that make sense?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Why can't we call `prepareCoordinatorForCloseTest` here? I think the only difference is we don't `coordinator.poll`, right? If so, I think we can add a parameter into `prepareCoordinatorForCloseTest` to determine if we need to poll or not, otherwise, there are many duplicated codes here. Ex:
   ```java
   private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
                                                                  final boolean autoCommit,
                                                                  final Optional<String> groupInstanceId,
                                                                  final shouldPoll) {
   
   ...
   if (shouldPoll) {
     coordinator.poll(time.timer(Long.MAX_VALUE));
   }
   }
   ```
   
   WDYT?
   
   Same comments applies to below new added tests.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Also, since we created a new coordinator, we should close it at the end of test. That's If you agree with my above suggestion, we can do like this to close it:
   ```java
   try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(...)) {
   
   
   }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());
+            } else if (joinPrepareTimer != null && joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(rebalanceConfig.retryBackoffMs);

Review Comment:
   > Since we cannot ensure that UnknownTopicOrPartitionException is caused by topic deletion(as said in [KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310)) , do you think wait rebalanceTimeout if commit offset failed is acceptable here?
   
   I think since we can't ensure what causes `UnknownTopicOrPartitionException` or other exception here, we should just retry until timeout here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r909364269


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   LGTM! thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908114549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   If `client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));` is still called here, the problem will recur, and the user will still block the `poll` method of kafkaConsumer. ` rebalanceConfig.rebalanceTimeoutMs` may be much larger than `pollDuration`
   
   Suggest:
   1. Promote the `future` variable in `onJoinPrepare` to the instance variable of `ConsumerCoordinator`. The variable name can be tentatively `rebalanceAutoCommitFuture`, and the initial value is `null`.
       ` private RequestFuture<Void> rebalanceAutoCommitFuture =null;`
   
   2. Refactor the `onJoinPrepare` method. The `rebalanceAutoCommitFuture` can be completed after the user has called the `poll` method multiple times without blocking the user's `poll` method.
   ```
           boolean onJoinPrepareAsyncCommitCompleted = false;
           if(autoCommitEnabled && rebalanceAutoCommitFuture == null){
               // async commit offsets prior to rebalance if auto-commit enabled
               rebalanceAutoCommitFuture = maybeAutoCommitOffsetsAsync();
           }
           if (rebalanceAutoCommitFuture != null) {
               client.poll(rebalanceAutoCommitFuture, time.timer(0));
           }
           // return true when
           // 1. future is null, which means no commit request sent, so it is still considered completed
           // 2. offset commit completed
           // 3. offset commit failed with non-retriable exception
           if (rebalanceAutoCommitFuture == null)
               onJoinPrepareAsyncCommitCompleted = true;
           else if (rebalanceAutoCommitFuture.succeeded()) {
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           } else if (rebalanceAutoCommitFuture.failed() && !rebalanceAutoCommitFuture.isRetriable()) {
               log.error("Asynchronous auto-commit of offsets failed: {}", rebalanceAutoCommitFuture.exception().getMessage());
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924670900


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);

Review Comment:
   added, thanks



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -809,11 +850,13 @@ else if (future.failed() && !future.isRetriable()) {
 
         isLeader = false;
         subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;

Review Comment:
   added, thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3540,7 +3353,7 @@ public void testPrepareJoinAndRejoinAfterFailedRebalance() {
             MockTime time = new MockTime(1);
 
             // onJoinPrepare will be executed and onJoinComplete will not.
-            boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
+            boolean res = coordinator.joinGroupIfNeeded(time.timer(100));

Review Comment:
   because we add a `timer.update();` at end of onJoinPrepare, this test will be failed on my local machine. So i have to increase the timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924677278


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Thanks! the test looks much simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676541


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1251,6 +1252,83 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() {

Review Comment:
   i've deleted it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922694080


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());
+            } else if (joinPrepareTimer != null && joinPrepareTimer.isExpired()) {

Review Comment:
   i think no, just removed it. (and refined the if section a little)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1185498396

   @showuon Unfortunately, I don't have time to review this one and I will be aways during the next three weeks. Don't wait on me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r909314930


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   Thanks @RivenSun2 , I forgot the purpose of our previous change. 
   The purpose of offset commit in `onJoinPrepare` is to make sure after rebalance, the other consumer can start from the correct offset. So, this offset commit must complete with the correct offset. In @RivenSun2 's suggestion, if we store the future as instance variable, and make sure there's only 1 inflight future, and that can fix the infinite offset commit issue in this PR. And since during this time, the group consumers are not able to consume messages due to rebalance in progress, we can make sure the offset commit is the only one and up-to-date one before reblancing. So I think this is a good solution.
   
   For the comment:
   >  it will cause KafkaConsuemr.poll() to be in a busy loop between the time when commitOffsetRequest is in flight.
   
   I agree. I'm thinking we can backoff when `onJoinPrepare` return false like this:
   ```java
   // return false when onJoinPrepare is waiting for committing offset
     if (!onJoinPrepare(generation.generationId, generation.memberId)) {
         needsJoinPrepare = true;
   
         // backoff some time  <-- new added
         timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs));
   
         //should not initiateJoinGroup if needsJoinPrepare still is true
         return false;
     }
   ```
   
   WDYT? @RivenSun2 @aiquestion ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r909123353


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   1. It is not easy to find a suitable value for 'the max retry time'. The most important point is that we should ensure that the `poll` method of kafkaConsumer can always return within the `pollDuration` time.
   2. I don't think it's too much of a problem to wait for the `rebalanceAutoCommitFuture` to complete by calling the poll method multiple times.
   We can take advice from other experts.
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r925138410


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,17 +1300,79 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertFalse(res);
+
+            pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+            res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            time.sleep(150);

Review Comment:
   @aiquestion , could you file another PR to remove this line? This is unneeded, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924022486


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Also, since we created a new coordinator, we should close it at the end of test. Otherwise, it'll break other tests.  If you agree with my above suggestion, we can do like this to close it:
   ```java
   try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(...)) {
   
   
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1170699158

   ok, will do the change, and add integration test. :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1188561057

   @aiquestion , since we already fixed the bug and other comments are just refactoring it, I think we can fix the broken tests and merge it for now. And then you create a follow-up PR to address my other comments. Is that OK for you? Or you'd like to address all my comments right away, which is also good to me. 
   
   PS. The failed tests can be fixed by addressing this comment: https://github.com/apache/kafka/pull/12349#discussion_r924022486 . Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924674693


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -191,7 +191,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
      * @param memberId The identifier of this member in the previous group or "" if there was none
      * @return true If onJoinPrepare async commit succeeded, false otherwise
      */
-    protected abstract boolean onJoinPrepare(int generation, String memberId);
+    protected abstract boolean onJoinPrepare(Timer timer, int generation, String memberId);

Review Comment:
   fixed, thanks



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:
+        // 1. offset commit haven't done (and joinPrepareTime not expired)
+        // 2. failed with retryable exception (and joinPrepareTime not expired)

Review Comment:
   fixed, thanks



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r923485561


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1413,10 +1458,55 @@ public void testOnJoinPrepareWithOffsetCommit() throws InterruptedException {
         time.sleep(150);
         res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
         assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommit_KeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
 
         pollTimer = time.timer(100L);
         time.sleep(150);
-        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(rebalanceTimeoutMs);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));

Review Comment:
   added~ i think i need to call `onJoinPrepare` once to init the joinPrepareTimer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189713426

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r926201391


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,17 +1300,79 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertFalse(res);
+
+            pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+            res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            time.sleep(150);

Review Comment:
   okay~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189224423

   @showuon Thanks a lot for your comment. 
   i think i've fixed all the comment. And yes, i can start a new PR to do it if you think 'current code is okay to merge, and need some refine'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922707608


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());
+            } else if (joinPrepareTimer != null && joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(rebalanceConfig.retryBackoffMs);

Review Comment:
   If we don't backoff here, i think another commit offset request will be sent with no delay if user just use a while loop with consumer.poll(). So i change to `Math.min(pollTimer.remainingMs, rebalanceConfig.retryBackoffMs)`
   
   But i think we still face the issue in [KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310), if commit offset request fails with `UnknownTopicOrPartitionException` it will retry commit offset until rebalanceTimeout reached. (only difference is consumer.poll() will return if timer expired)
   Since we cannot ensure that `UnknownTopicOrPartitionException` is caused by topic deletion(as said in [KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310)) , do you think wait rebalanceTimeout if commit offset failed is acceptable here? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922938170


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1413,10 +1458,55 @@ public void testOnJoinPrepareWithOffsetCommit() throws InterruptedException {
         time.sleep(150);
         res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
         assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommit_KeepJoinAfterRebalanceTimeout() {

Review Comment:
   nit: In Kafka, we usually don't have underscore `_` in the test name. Could we replace it with `should`, ex: `testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout()`, and other tests also apply. Thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1413,10 +1458,55 @@ public void testOnJoinPrepareWithOffsetCommit() throws InterruptedException {
         time.sleep(150);
         res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
         assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommit_KeepJoinAfterRebalanceTimeout() {
+        rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                new Metrics(),
+                assignors,
+                true,
+                subscriptions);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+
+        coordinator.ensureActiveGroup();
+        subscriptions.seek(t1p, 100L);
+
+        int generationId = 42;
+        String memberId = "consumer-42";
+
+        Timer pollTimer = time.timer(100L);
+        time.sleep(150);
+        boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
 
         pollTimer = time.timer(100L);
         time.sleep(150);
-        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(rebalanceTimeoutMs);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));

Review Comment:
   Could we add a test and only testing `KeepJoinAfterRebalanceTimeout`? That is, 
   ```
           pollTimer = time.timer(100L);
           time.sleep(rebalanceTimeoutMs);
           // no offset commit response
           res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
           // still return true
           assertTrue(res);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon merged PR #12349:
URL: https://github.com/apache/kafka/pull/12349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908114549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   If `client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));` is still called here, the problem maybe will recur, and the user maybe still block the `poll` method of kafkaConsumer. ` rebalanceConfig.rebalanceTimeoutMs` may be much larger than `pollDuration`
   
   Suggest:
   1. Promote the `future` variable in `onJoinPrepare` to the instance variable of `ConsumerCoordinator`. The variable name can be tentatively `rebalanceAutoCommitFuture`, and the initial value is `null`.
       ` private RequestFuture<Void> rebalanceAutoCommitFuture =null;`
   
   2. Refactor the `onJoinPrepare` method. The `rebalanceAutoCommitFuture` can be completed after the user has called the `poll` method multiple times without blocking the user's `poll` method.
   ```
           boolean onJoinPrepareAsyncCommitCompleted = false;
           if(autoCommitEnabled && rebalanceAutoCommitFuture == null){
               // async commit offsets prior to rebalance if auto-commit enabled
               rebalanceAutoCommitFuture = maybeAutoCommitOffsetsAsync();
           }
           if (rebalanceAutoCommitFuture != null) {
               client.poll(rebalanceAutoCommitFuture, time.timer(0));
           }
           // return true when
           // 1. future is null, which means no commit request sent, so it is still considered completed
           // 2. offset commit completed
           // 3. offset commit failed with non-retriable exception
           if (rebalanceAutoCommitFuture == null)
               onJoinPrepareAsyncCommitCompleted = true;
           else if (rebalanceAutoCommitFuture.succeeded()) {
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           } else if (rebalanceAutoCommitFuture.failed() && !rebalanceAutoCommitFuture.isRetriable()) {
               log.error("Asynchronous auto-commit of offsets failed: {}", rebalanceAutoCommitFuture.exception().getMessage());
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924600012


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1251,6 +1252,83 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() {

Review Comment:
   Actually i wrote it to show the behavior of [KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310) case after this change.
   It's not a case i want to protect from being changed, i think i can delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924675152


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1168158408

   @showuon  test added. Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908114549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   If `client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));` is still called here, the problem will recur, and the user will still block the `poll` method of kafkaConsumer. ` rebalanceConfig.rebalanceTimeoutMs` may be much larger than `pollDuration`
   
   Suggest:
   1. Promote the `future` variable in `onJoinPrepare` to the instance variable of `ConsumerCoordinator`. The variable name can be tentatively `rebalanceAutoCommitFuture`, and the initial value is `null`.
       ` private RequestFuture<Void> rebalanceAutoCommitFuture =null;`
   
   2. Refactor the `onJoinPrepare` method. The `rebalanceAutoCommitFuture` can be completed after the user has called the `poll` method multiple times without blocking the user's `poll` method.
           ```
           boolean onJoinPrepareAsyncCommitCompleted = false;
           if(autoCommitEnabled && rebalanceAutoCommitFuture == null){
               // async commit offsets prior to rebalance if auto-commit enabled
               rebalanceAutoCommitFuture = maybeAutoCommitOffsetsAsync();
           }
           if (rebalanceAutoCommitFuture != null) {
               client.poll(rebalanceAutoCommitFuture, time.timer(0));
           }
           // return true when
           // 1. future is null, which means no commit request sent, so it is still considered completed
           // 2. offset commit completed
           // 3. offset commit failed with non-retriable exception
           if (rebalanceAutoCommitFuture == null)
               onJoinPrepareAsyncCommitCompleted = true;
           else if (rebalanceAutoCommitFuture.succeeded()) {
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           } else if (rebalanceAutoCommitFuture.failed() && !rebalanceAutoCommitFuture.isRetriable()) {
               log.error("Asynchronous auto-commit of offsets failed: {}", rebalanceAutoCommitFuture.exception().getMessage());
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1173895320

   @showuon @RivenSun2 i refined some of the code, pls help to take a look when you have time:
   * as @RivenSun2 suggest, use a `autoCommitOffsetRequestFuture` to store commit offset request.
   * still wait for the response but limit to poll's timer, instead of sleep backoff when onJoinPrepare failed. This is to avoid busy loop.
   
   I think both previous change and this will will not meet the issue in [KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310) because, if commit offset failed:
   * in EAGER mode we will revoke all partitions and next CommitOffset will alway be success
   * in COOPERATIVE mode we will revoke all partitions that are not in the metadata anymore, so next CommitOffset will be ok.
   
   But in this latest change, in EAGER mode. We will wait for commit offset request before rejoin, but if commitOffset failed, no retry request will be sent since we revoke all partitions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r913507405


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)

Review Comment:
   I just tested the test without your change, and it passed. Then, I found in the test, we default `ENABLE_AUTO_COMMIT_CONFIG` to false. So, please enable it explicitly. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r917501430


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")

Review Comment:
   sorry, didn't run this test locally yesterday. fixed it.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME))

Review Comment:
   i cannot use classOf in Annotation, it will get "annotation argument needs to be a constant" error. Just set the full class name instead



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+          return

Review Comment:
   thanks, fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676347


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+    "org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+        }
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment,
+      s"Timed out while awaiting expected assignment change to $expectedAssignment.")
+
+    // Since the consumer1 already completed the rebalance,
+    // the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId
+    var stableGeneration = -1
+    var stableMemberId1 = ""
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      stableGeneration = generationId1
+      stableMemberId1 = memberId1
+    } finally {
+      lock.unlock()
+    }
+
+    val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, List(topic))
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 1,
+      s"Timed out while awaiting expected assignment change to 1.")
+    TestUtils.waitUntilTrue(() => consumerPoller2.consumerAssignment().size == 1,
+      s"Timed out while awaiting expected assignment change to 1.")
+
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      if (assignmentStrategy.equals(classOf[CooperativeStickyAssignor].getName)) {
+        // cooperative rebalance should rebalance twice before finally stable
+        assertEquals(stableGeneration + 2, generationId1)
+      } else {
+        // eager rebalance should rebalance once once before finally stable

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
mumrah commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1188068663

   @aiquestion looks like you've got a few test failures https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12349/11/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922760493


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();

Review Comment:
   nice catch!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true

Review Comment:
   Also, since we've changed the logic here, could you also add some tests for it? You can refer to these tests: `ConsumerCoordinatorTest#testJoinPrepareAndCommitCompleted, ConsumerCoordinatorTest#testJoinPrepareWithDisableAutoCommit`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");
+            } else if (!autoCommitOffsetRequestFuture.isDone() ||
+                autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = false;

Review Comment:
   Maybe add a debug log here:
   `log.debug("Asynchronous auto-commit of offsets failed with retriable error: {}. Will retry it.", autoCommitOffsetRequestFuture.exception().getMessage());
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }

Review Comment:
   should we update both timer here to reflect the client poll waiting. i.e. 
   ```java
           if (autoCommitOffsetRequestFuture != null) {
               Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
                      timer : joinPrepareTimer;
               client.poll(autoCommitOffsetRequestFuture, pollTimer);
              
               timer.update();
               joinPrepareTimer.update();
           }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -141,6 +141,9 @@ private boolean sameRequest(final Set<TopicPartition> currentRequest, final Gene
     }
 
     private final RebalanceProtocol protocol;
+    // pending commit offset request in onJoinPrepare
+    private RequestFuture<Void> autoCommitOffsetRequestFuture = null;
+    private Timer joinPrepareTimer = null;

Review Comment:
   Please also add a comment for `joinPrepareTimer`, ex:
   `// a timer for join prepare to know when to stop. It'll set to rebalance timeout so that the member can join the group successfully even though offset commit failed.`
   WDYT?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true

Review Comment:
   It's not clear what it means to return true/false here. Maybe:
   ```
   // keep retrying the offset commit when:
   // 1. offset commit haven't done (and joinPrepareTime not expired)
   // 2. failed with retryable exception (and joinPrepareTime not expired)
   // Otherwise, continue to revoke partitions, ex:
   // 1. if joinPrepareTime has expired
   // 2. if offset commit failed with no-retryable exception
   // 3. if offset commit success
   ```
   WDYT?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");
+            } else if (!autoCommitOffsetRequestFuture.isDone() ||
+                autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && !autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs));
+            return false;

Review Comment:
   `joinPrepareTimer` should also update() after sleep, right?
   `joinPrepareTimer.update();`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -809,11 +835,12 @@ else if (future.failed() && !future.isRetriable()) {
 
         isLeader = false;
         subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;

Review Comment:
   We should also reset `autoCommitOffsetRequestFuture` to null before complete it, right? Otherwise, next time `onJoinPrepare` called, it will think there's an  in-flight request, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.

Review Comment:
   nit: // wait for commit offset response before timer expired



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");

Review Comment:
   Maybe change to:
   `log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout. Will continue to join group");`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled

Review Comment:
   We should update this commit as:
   `// async commit offsets prior to rebalance if auto-commit enabled and there is no in-flight offset commit request`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");
+            } else if (!autoCommitOffsetRequestFuture.isDone() ||
+                autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && !autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());

Review Comment:
   nit: `log.error("Asynchronous auto-commit of offsets failed: {}. Will continue to join group.", autoCommitOffsetRequestFuture.exception().getMessage());`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r921765863


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());

Review Comment:
   These 2 else if conditions are the same. Please fix it.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;

Review Comment:
   I think we should also check `joinPrepareTimer.isExpired();` here, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());
+            } else if (joinPrepareTimer != null && joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout");
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(rebalanceConfig.retryBackoffMs);

Review Comment:
   I don't think we should backoff here, or we should backoff `Math.min(pollTimer.remainingMs, rebalanceConfig.retryBackoffMs)`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());
+            } else if (joinPrepareTimer != null && joinPrepareTimer.isExpired()) {

Review Comment:
   Curious: should we need null check here? Will this `onJoinPrepare` methodh be called by other thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1185155731

   @dajac @guozhangwang , do you want to have another look at this PR? You can also check this comment to know what we're trying to do in this PR: https://github.com/apache/kafka/pull/12349#pullrequestreview-1028116488. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r923485781


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1413,10 +1458,55 @@ public void testOnJoinPrepareWithOffsetCommit() throws InterruptedException {
         time.sleep(150);
         res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
         assertFalse(res);
+        pollTimer = time.timer(100L);
+        time.sleep(150);
+        client.respond(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+        res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+        assertTrue(res);
+
+        assertFalse(client.hasPendingResponses());
+        assertFalse(client.hasInFlightRequests());
+        assertFalse(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommit_KeepJoinAfterRebalanceTimeout() {

Review Comment:
   fixed, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922693053


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+        }
+
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
         }
 
+        // return false when:
+        //   1. offset commit haven't done
+        //   2. offset commit failed with retriable exception and joinPrepare haven't expired
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired();
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage());

Review Comment:
   thanks! i think it's a bug here, the second branch i want to check !autoCommitOffsetRequestFuture.isRetriable()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676087


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
+
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        // and there is no in-flight offset commit request
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer expired.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+            timer.update();
+            joinPrepareTimer.update();
+        }
+
+        // keep retrying the offset commit when:
+        // 1. offset commit haven't done (and joinPrepareTime not expired)
+        // 2. failed with retryable exception (and joinPrepareTime not expired)
+        // Otherwise, continue to revoke partitions, ex:
+        // 1. if joinPrepareTime has expired
+        // 2. if offset commit failed with no-retryable exception
+        // 3. if offset commit success
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout. Will continue to join group");
+            } else if (!autoCommitOffsetRequestFuture.isDone()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) {
+                log.debug("Asynchronous auto-commit of offsets failed with retryable error: {}. Will retry it.",
+                          autoCommitOffsetRequestFuture.exception().getMessage());
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && !autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}. Will continue to join group.",
+                          autoCommitOffsetRequestFuture.exception().getMessage());
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs));

Review Comment:
   you are right, i just try to update every timer in my last commit. :-) fixed, thanks



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+    "org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+        }
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment,
+      s"Timed out while awaiting expected assignment change to $expectedAssignment.")
+
+    // Since the consumer1 already completed the rebalance,
+    // the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId
+    var stableGeneration = -1
+    var stableMemberId1 = ""
+    if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+      fail(s"Time out while awaiting for lock.")
+    }
+    try {
+      stableGeneration = generationId1
+      stableMemberId1 = memberId1
+    } finally {
+      lock.unlock()
+    }
+
+    val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, List(topic))
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 1,
+      s"Timed out while awaiting expected assignment change to 1.")

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r942511881


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,17 +1300,79 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertFalse(res);
+
+            pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+            res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            time.sleep(150);

Review Comment:
   Submitted follow-up here since it has been 21 days https://github.com/aiquestion/kafka/pull/1/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r942511881


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1299,17 +1300,79 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
         }
     }
 
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertFalse(res);
+
+            pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
+            res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
+            boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+            assertTrue(res);
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            Timer pollTimer = time.timer(100L);
+            time.sleep(150);

Review Comment:
   Submitted follow-up here since it has been 21 days https://github.com/apache/kafka/pull/12500



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r913457476


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock1 = new ReentrantLock()
+    var generationId1 = 0

Review Comment:
   I think the initial number should set a invalid number, ex: -1, to avoid the end generationId happen to be `0`, so that we don't catch any error in this test.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock1 = new ReentrantLock()
+    var generationId1 = 0
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        lock1.lock()

Review Comment:
   I know in this simple test, there should not be deadlock happen, but I still think we should avoid any possibility here. Could we use `tryLock` instead of `lock`, to avoid to block the test forever?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock1 = new ReentrantLock()

Review Comment:
   since we don't have `lock2`, could we name it as `lock` here?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"

Review Comment:
   Since we only need 1 topic in this test, we can use global `topic` variable here.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock1 = new ReentrantLock()
+    var generationId1 = 0
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        lock1.lock()
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock1.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic1), Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment,
+      s"Timed out while awaiting expected assignment change to 2.")

Review Comment:
   I don't think you're only waiting for the assignment size to 2 here, you also wait for the content of the assignment is expected. Maybe you can change the error message to:
   ```
   s"Timed out while awaiting expected assignment change to $expectedAssignment."
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock1 = new ReentrantLock()
+    var generationId1 = 0
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        lock1.lock()

Review Comment:
   Same comments to below lock



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {
+    // 2 consumer using cooperative-sticky assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cooperative-sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[CooperativeStickyAssignor].getName)
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic1 = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock1 = new ReentrantLock()
+    var generationId1 = 0
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        lock1.lock()
+        try {
+          generationId1 = consumer1.groupMetadata().generationId()
+          memberId1 = consumer1.groupMetadata().memberId()
+        } finally {
+          lock1.unlock()
+        }
+      }
+    }
+    val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic1), Set.empty, customRebalanceListener)
+    consumerPoller1.start()
+    TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment,
+      s"Timed out while awaiting expected assignment change to 2.")
+
+    var stableGeneration = 0

Review Comment:
   1. Same as above, change the initial number to -1
   2. maybe we need to add a comment above the line to explain why we can make sure we already got the `generationId1` and `memberId1`. Ex:
   ```
   // Since the consumer1 already completed the rebalance, the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +970,71 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testCoopearativeAssigorAndRejoin(): Unit = {

Review Comment:
   1. Maybe rename to `testRebalanceAndRejoin`
   2. Could we also test eager assignor? Ex:
   ```
     @ParameterizedTest
     @ValueSource(strings = Array(classOf[CooperativeStickyAssignor].getName, classOf[RangeAssignor].getName))
     def testRebalanceAndRejoin(partitionAssignmentStrategy: String): Unit = {
   ```
   I think the only place we need to change is the last generationId should be +1 or +2, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r917485378


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME))

Review Comment:
   It needs a full class name, to be passed into PARTITION_ASSIGNMENT_STRATEGY_CONFIG config, ex:
   `@ValueSource(strings = Array(classOf[CooperativeStickyAssignor].getName, classOf[RangeAssignor].getName))`
   



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")

Review Comment:
   Are you sure this is correct?
   ```
   Returns:
       true if the lock was free and was acquired by the current thread, or the lock was already held by the current thread; and false if the waiting time elapsed before the lock could be acquired
   ```
   https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html#tryLock-long-java.util.concurrent.TimeUnit-
   
   Maybe it is like this:
   ```
   if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
     fail(s"Time out while awaiting for lock.")
   }



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME, RangeAssignor.RANGE_ASSIGNOR_NAME))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+    // create 2 consumers
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer1 = createConsumer()
+    val consumer2 = createConsumer()
+
+    // create a new topic, have 2 partitions
+    val topic = "topic1"
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+    assertEquals(0, consumer1.assignment().size)
+    assertEquals(0, consumer2.assignment().size)
+
+    val lock = new ReentrantLock()
+    var generationId1 = -1
+    var memberId1 = ""
+    val customRebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
+      }
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
+        if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+          fail(s"Time out while awaiting for lock.")
+          return

Review Comment:
   `return` is not necessary because `fail` will throw exception directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org