You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/09 00:16:03 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

guozhangwang opened a new pull request, #12611:
URL: https://github.com/apache/kafka/pull/12611

   Today we may try to discover coordinator in both blocking (e.g. in `poll`) and non-blocking (e.g. in `commitAsync`) way. For the latter we would poll the underlying network client with timeout 0, and in this case we should not trigger wakeup since these are non-blocking calls and hence should not throw wake-ups.
   
   In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of `ensureCoordinatorReady`), since in our threading refactoring, the `ensureCoordinatorReady` function would not be called by the calling thread any more and only triggered by the background thread, and hence we would have a much simpler manner to ensure that non-blocking functions never throw wake-ups.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r967930755


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -272,6 +272,39 @@ public void testCoordinatorDiscoveryBackoff() {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testNoWakeupWhenNonBlockingDiscoverCoordinator() {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        consumerClient.wakeup();
+
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        // a follow-up poll should still throw
+        try {
+            coordinator.joinGroupIfNeeded(mockTime.timer(0));
+            fail("Should have woken up from joinGroupIfNeeded()");
+        } catch (WakeupException ignored) {
+        }
+    }
+
+    @Test
+    public void testWakeupWhenBlockingDiscoverCoordinator() throws Exception {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        consumerClient.wakeup();
+
+        try {
+            coordinator.ensureCoordinatorReady(mockTime.timer(1));
+            fail("Should have woken up from ensureCoordinatorReady()");
+        } catch (WakeupException ignored) {
+        }

Review Comment:
   nit: can be replaced with `assertThrows`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -272,6 +272,39 @@ public void testCoordinatorDiscoveryBackoff() {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testNoWakeupWhenNonBlockingDiscoverCoordinator() {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        consumerClient.wakeup();
+
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        // a follow-up poll should still throw
+        try {
+            coordinator.joinGroupIfNeeded(mockTime.timer(0));
+            fail("Should have woken up from joinGroupIfNeeded()");
+        } catch (WakeupException ignored) {

Review Comment:
   nit:
   ```java
   assertThrows(WakeupException.class, 
   () -> coordinator.joinGroupIfNeeded(mockTime.timer(0)), 
   "Should have woken up from joinGroupIfNeeded()")
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+
+            // if we do not want to block on discovering coordinator at all,
+            // then we should not try to poll in a loop, and should not throw wake-up exception either
+            if (timer.timeoutMs() == 0L) {

Review Comment:
   > would we expect the next blocking call to trigger a wakeup even if it were called with a timeout of zero?
   
   Yes, I think as long as it's a blocking call, the wakeup exception should be thrown, even if it's zero timeout. I've checked existing javadoc in KafkaConsumer, all the methods, which declared wakeupException will be thrown, will still throw wakeupExceptions after this change. So, I think this PR change (1) won't break API, and (2) fixes the issue for commitAsync, which makes sense to me.
   
   > I wonder if it would be better to overload ensureCoordinatorReady with an additional flag?
   
   Yes, as @guozhangwang mentioned in PR description:
   > In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of ensureCoordinatorReady)
   
   I think this fix is safer. But I don't have strong opinion about it, just want to raise the release timing issue here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r967541336


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+
+            // if we do not want to block on discovering coordinator at all,
+            // then we should not try to poll in a loop, and should not throw wake-up exception either
+            if (timer.timeoutMs() == 0L) {

Review Comment:
   I wonder if this breaks the API in a different way. If the user calls `wakeup()`, would we expect the next blocking call to trigger a wakeup even if it were called with a timeout of zero? I wonder if it would be better to overload `ensureCoordinatorReady` with an additional flag?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r968778503


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+
+            // if we do not want to block on discovering coordinator at all,
+            // then we should not try to poll in a loop, and should not throw wake-up exception either
+            if (timer.timeoutMs() == 0L) {

Review Comment:
   Yeah, I feel it's a bit slippery to leave the logic in place with just a TODO somewhere to go back and fix it. Too many times, the follow-up never happens. Is it really that much additional effort to add an overload?
   
   ```java
   protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
     ensureCoordinatorReady(timer, true);
   }
   
   private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean checkWakeup) {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji closed pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
hachikuji closed pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
URL: https://github.com/apache/kafka/pull/12611


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12611:
URL: https://github.com/apache/kafka/pull/12611#issuecomment-1244878455

   I'll go ahead and close this since we're going to merge https://github.com/apache/kafka/pull/12626.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r968778503


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+
+            // if we do not want to block on discovering coordinator at all,
+            // then we should not try to poll in a loop, and should not throw wake-up exception either
+            if (timer.timeoutMs() == 0L) {

Review Comment:
   Yeah, I feel it's a bit slippery to leave the logic in place with just a TODO somewhere to go back and fix it. Too many times, the follow-up never happens. Is it really that much additional effort to add an overload?
   
   ```java
   protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
     return ensureCoordinatorReady(timer, true);
   }
   
   private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean checkWakeup) {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12611:
URL: https://github.com/apache/kafka/pull/12611#issuecomment-1244352156

   @showuon Since Guozhang is out sick, I raised https://github.com/apache/kafka/pull/12626 which addresses your (and my) comments. Please take a look. I'll leave this open for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12611:
URL: https://github.com/apache/kafka/pull/12611#issuecomment-1241358026

   Call @philipnee @showuon for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org