You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/02 18:22:28 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12244: [DO NOT MERGE] HOTFIX: only try to clear discover-coordinator future upon commit

guozhangwang opened a new pull request, #12244:
URL: https://github.com/apache/kafka/pull/12244

   This is another way of fixing KAFKA-13563 other than #11631.
   
   Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator:
   
   1) commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix.
   2) commitSync, which we already try to re-discovery coordinator.
   3) committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator.
   
   The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12244: [DO NOT MERGE] HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12244:
URL: https://github.com/apache/kafka/pull/12244#issuecomment-1145176689

   @showuon @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12244:
URL: https://github.com/apache/kafka/pull/12244


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12244:
URL: https://github.com/apache/kafka/pull/12244#issuecomment-1147734294

   Cherry-picking to 3.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r890401527


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,10 +514,62 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // set timeout to 0 because we expect no requests sent
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertFalse(client.hasInFlightRequests());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertTrue(client.hasInFlightRequests());
+
+            client.respond(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(0));
+            assertFalse(coordinator.coordinatorUnknown());
+            // after we've discovered the coordinator we should send
+            // out the commit request immediately
+            assertTrue(client.hasInFlightRequests());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // set timeout to 0 because we expect no requests sent
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertFalse(client.hasInFlightRequests());
+
+        // should try to find coordinator since we are commit async
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> {
+            throw new AssertionError("Commit should not get responses");

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889351949


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+        ) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889350182


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
             // requests result in polls returning immediately, causing a tight loop of polls. Without
             // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
             // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
-            if (coordinatorUnknownAndUnready(timer)) {
-                return false;
+            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889361765


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+        ) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+            // set timeout to 0 because we don't want to retry after the error
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+        // set timeout to 0 because we don't want to retry after the error
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
                 }
             }
         } else {
-            // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata.
+            // For manually assigned partitions, we do not try to pro-actively lookup coordinator;
+            // instead we only try to refresh metadata when necessary.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
             // requests result in polls returning immediately, causing a tight loop of polls. Without
             // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
             // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
-            if (coordinatorUnknownAndUnready(timer)) {
-                return false;
+            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
+                client.awaitMetadataUpdate(timer);
             }
+
+            // if there is pending coordinator requests, ensure they have a chance to be transmitted.

Review Comment:
   This is a major change while addressing @dajac 's comment: previously the manual assignment, the `coordinator.poll` call would not call `networkClient.poll`, which means that if the coordinator discovery does not complete within the `commitAsync` (note we call `networkClient.poll` twice in that function, so it's possible that function would complete the discovery), we would have no other places to poll the network client to complete the pending requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889014500


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+        ) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+            // set timeout to 0 because we don't want to retry after the error
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+        // set timeout to 0 because we don't want to retry after the error
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());

Review Comment:
   Should we also assert that there is not inflight requests after calling poll?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+        ) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));

Review Comment:
   Same question as before.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
             // requests result in polls returning immediately, causing a tight loop of polls. Without
             // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
             // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
-            if (coordinatorUnknownAndUnready(timer)) {
-                return false;
+            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {

Review Comment:
   nit: I think that we need to remove `if coordinator is unknown, make sure we lookup one and` from the above comment (first sentence).



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+        ) {

Review Comment:
   nit: I would bring back the closing parenthesis and the opening curly brace on the previous line.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+        ) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));
+            // set timeout to 0 because we don't want to retry after the error
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(coordinator.coordinatorUnknown());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE));

Review Comment:
   Do we really need this? I thought that the whole point was to ensure that no requests are sent out when the manual mode is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889496255


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,10 +514,62 @@ public void testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // set timeout to 0 because we expect no requests sent
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertFalse(client.hasInFlightRequests());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertTrue(client.hasInFlightRequests());
+
+            client.respond(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(0));
+            assertFalse(coordinator.coordinatorUnknown());
+            // after we've discovered the coordinator we should send
+            // out the commit request immediately
+            assertTrue(client.hasInFlightRequests());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // set timeout to 0 because we expect no requests sent
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertFalse(client.hasInFlightRequests());
+
+        // should try to find coordinator since we are commit async
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> {
+            throw new AssertionError("Commit should not get responses");

Review Comment:
   nit: use `fail` instead, and we might need to log the callback parameters for troubleshooting. 
   ```
   fail("Commit should not get responses, but got offsets:" + offsets +", and exception:" + exception)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org