You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/02 02:43:18 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

ableegoldman opened a new pull request #9671:
URL: https://github.com/apache/kafka/pull/9671


   A race condition between the consumer and hb thread can lead to a failed but non-null `findCoordinatorFuture`, causing the AbstractCoordinator to wait endlessly on the request which it thinks is still in flight. We should move the handling of this future out of the listener callbacks and into the `ensureCoordinatorReady()` method where we can check the exception and clear the future all in one place.  
   
   See ticket for full analysis.
   
   Also starts logging a warning if the consumer is unable to connect to the coordinator for longer than the max poll interval.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-747732868


   @ijuma sorry, I missed your earlier response. It's definitely not a trivial bug, yes. The main reasons I didn't formally propose this as a blocker was that it's been around forever, and I'm not confident that the fix is low-risk. WDYT @bbejeck ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chenhongluo edited a comment on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
chenhongluo edited a comment on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592


   We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea?
   
   There are code:
   
   ```
   protected synchronized RequestFuture<Void> lookupCoordinator() {
           if (findCoordinatorFuture == null) {
               // find a node to ask about the coordinator
               Node node = this.client.leastLoadedNode();
               if (node == null) {
                   // TODO: If there are no brokers left, perhaps we should use the bootstrap set
                   // from configuration?
                   log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
                   return RequestFuture.noBrokersAvailable();
               } else {
                   findCoordinatorFuture= sendGroupCoordinatorRequest(node);
               }
           }
           return findCoordinatorFuture;
       }
     private synchronized void clearFindCoordinatorFuture() {
           findCoordinatorFuture = null;
       }
   ```
   
     


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r534377676



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
 
         @Override
         public void onFailure(RuntimeException e, RequestFuture<Void> future) {
-            clearFindCoordinatorFuture();
+            log.debug("FindCoordinator request failed", e);

Review comment:
       nit: I think it's better to just print the e.message in a single line.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -235,11 +235,6 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
             return true;
 
         do {
-            if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {

Review comment:
       The main reason for https://github.com/apache/kafka/pull/7312/files#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R230-R234 is the following:
   
   * inside ensureCoordinatorReady called by the main thread, we may break out of the loop at line 248 below, without knowing what's the final state of the future.
   * and that future could be completed by the other thread (hb) later, and replaced by a new `future` object.
   
   In that case, when the main thread calls ensureCoordinatorReady again, it will "miss" the previous future's contained fatal error.
   
   So thinking about it again, I think we would still want to maintain the exception but only if it is a fatal one inside the handler (i.e. we do not probably need to register another listener just to bookkeep that exception, but just piggy-back this logic inside the handler listener directly), and then inside the while loop, we check if a previous future already gets a fatal exception and if yes, throw it to fail the whole client.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 break;
             }
 
+            RuntimeException fatalException = null;
+
             if (future.failed()) {
                 if (future.isRetriable()) {
                     log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
                     client.awaitMetadataUpdate(timer);
-                } else
-                    throw future.exception();
+                } else {
+                    log.info("FindCoordinator request hit fatal  exception", fatalException);
+                    fatalException = future.exception();
+                }
             } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
+
+            clearFindCoordinatorFuture();

Review comment:
       I think this is a better approach, but we need to be careful about the callee inside hb thread:
   
   ```
   if (findCoordinatorFuture != null || lookupCoordinator().failed())
   ```
   
   i.e. a hb thread sending a discover-coordinator request would also cause a future to be assigned, but that future would only be cleared by the main thread caller. Thinking about that for a sec I think this is okay, but maybe worth having a second pair of eyes over it.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 break;
             }
 
+            RuntimeException fatalException = null;
+
             if (future.failed()) {
                 if (future.isRetriable()) {
                     log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
                     client.awaitMetadataUpdate(timer);
-                } else
-                    throw future.exception();
+                } else {
+                    log.info("FindCoordinator request hit fatal  exception", fatalException);

Review comment:
       nit: extra space.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387


   > Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it
   
   Hmm... I'm not familiar with SaslXConsumerTest either...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r558623815



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || lookupCoordinator().failed())
+                            if (findCoordinatorFuture != null || lookupCoordinator().failed()) {

Review comment:
       The nested condition is a bit awkward, how about this:
   
   ```
   if (findCoordinatorFuture != null) {
       // if it has failed, clear it so that hb thread can try discover again in the next loop in case main thread is busy
       if (findCoordinatorFuture.failed()) {
           clearFindCoordinatorFuture();
       } 
   
       // backoff properly
       AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
   } else {
       lookupCoordinator();
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chenhongluo edited a comment on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
chenhongluo edited a comment on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592


   We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea? @ableegoldman @guozhangwang 
   
   There are code:
   
   ```
   protected synchronized RequestFuture<Void> lookupCoordinator() {
           if (findCoordinatorFuture == null) {
               // find a node to ask about the coordinator
               Node node = this.client.leastLoadedNode();
               if (node == null) {
                   // TODO: If there are no brokers left, perhaps we should use the bootstrap set
                   // from configuration?
                   log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
                   return RequestFuture.noBrokersAvailable();
               } else {
                   findCoordinatorFuture= sendGroupCoordinatorRequest(node);
               }
           }
           return findCoordinatorFuture;
       }
     private synchronized void clearFindCoordinatorFuture() {
           findCoordinatorFuture = null;
       }
   ```
   
     


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387


   > Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it
   
   Hmm... I'm not familiar with SaslXConsumerTest either...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r539567098



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
 
         @Override
         public void onFailure(RuntimeException e, RequestFuture<Void> future) {
-            clearFindCoordinatorFuture();
+            log.debug("FindCoordinator request failed", e);

Review comment:
       if you just have
   
   ```
   log.debug("FindCoordinator request failed due to {}", e)
   ```
   
   Then e.toString would be called which would usually be `e.name(): e.getMessage()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537990731



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 break;
             }
 
+            RuntimeException fatalException = null;
+
             if (future.failed()) {
                 if (future.isRetriable()) {
                     log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
                     client.awaitMetadataUpdate(timer);
-                } else
-                    throw future.exception();
+                } else {
+                    log.info("FindCoordinator request hit fatal  exception", fatalException);
+                    fatalException = future.exception();
+                }
             } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
+
+            clearFindCoordinatorFuture();

Review comment:
       Ah, good point...actually I think that's probably not ok for it to only ever be cleared in the main thread, since eg the main thread might be stuck in long processing while the hb threads should not be blocked from looking up the coordinator.
   So, maybe we should also call `clearFindCoordinatorFuture` inside the hb thread in the `if (findCoordinatorFuture != null || lookupCoordinator().failed())` block (if it did indeed finish and has failed) -- WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537986486



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
 
         @Override
         public void onFailure(RuntimeException e, RequestFuture<Void> future) {
-            clearFindCoordinatorFuture();
+            log.debug("FindCoordinator request failed", e);

Review comment:
       You mean like 
   ```
   log.debug("FindCoordinator request failed due to {}", e.getMessage());
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537992012



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -235,11 +235,6 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
             return true;
 
         do {
-            if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {

Review comment:
       Ok yeah that makes sense. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767988592


   Merged to trunk and cherrypicked to 2.7


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539


   Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539


   Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537990731



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 break;
             }
 
+            RuntimeException fatalException = null;
+
             if (future.failed()) {
                 if (future.isRetriable()) {
                     log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
                     client.awaitMetadataUpdate(timer);
-                } else
-                    throw future.exception();
+                } else {
+                    log.info("FindCoordinator request hit fatal  exception", fatalException);
+                    fatalException = future.exception();
+                }
             } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
+
+            clearFindCoordinatorFuture();

Review comment:
       Ah, good point...actually I think that's probably not ok for it to only ever be cleared in the main thread, since eg the main thread might be stuck in long processing while the hb threads should not be blocked from looking up the coordinator.
   So, maybe we should also call `clearFindCoordinatorFuture` inside the hb thread in the `if (findCoordinatorFuture != null || lookupCoordinator().failed())` block




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-747748917


   > that it's been around forever, and I'm not confident that the fix is low-risk.
   
   Sorry I didn't see this before.  
   
   I'd agree that it is not a blocker for reasons outlined by @ableegoldman.   I'd like to see this get in, but it seems we'd like a little more time for this patch to soak.  
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r558623815



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || lookupCoordinator().failed())
+                            if (findCoordinatorFuture != null || lookupCoordinator().failed()) {

Review comment:
       The nested condition is a bit awkward, how about this:
   
   ```
   // try to find coordinator once if we have not yet done so; otherwise backoff properly
   if (findCoordinatorFuture != null) {
       AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
   } else {
       lookupCoordinator();
   }
   
   // now findCoordinatorFuture should not be null;
   // if it has failed, clear it so that hb thread can try discover again in the next loop in case main thread is busy
   if (findCoordinatorFuture.failed()) {
       clearFindCoordinatorFuture();
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767983217


   A few tests failed, but no hanging this time:
   
   ```
   StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores -- known to be flaky
   FetcherTest.testEarlierOffsetResetArrivesLate -- hit "TimeoutException: testEarlierOffsetResetArrivesLate() timed out after 10 seconds", I haven't seen this fail before, on this PR or on any other, so I believe it's unrelated. But I ran it 10 times locally to be sure and all passed
   MirrorConnectorsIntegrationSSLTest.testReplication -- in Connect, seems to be unrelated
   StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores -- known flaky, looks environmental (slow startup)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-769616246


   Awesome!!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-859044497


   It should be sufficient to upgrade just the consumers, this is a client-side fix only


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r558623815



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || lookupCoordinator().failed())
+                            if (findCoordinatorFuture != null || lookupCoordinator().failed()) {

Review comment:
       The nested condition is a bit awkward, how about this:
   
   ```
   if (findCoordinatorFuture != null) {
       // if it has failed, clear it so that hb thread can try discover again in the next loop in case main thread is busy
       if (findCoordinatorFuture.failed()) {
           clearFindCoordinatorFuture();
       } else {
           // otherwise backoff properly
           AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
       }
   } else {
       lookupCoordinator();
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767278329


   Ok I think I've gotten to the bottom of this hanging test, and pushed a fix. Tests seem to be passing reliably for me locally. Aiming to get this merged in the next day or so so let me know if you have any concerns around the latest @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-741798735


   Is this a 2.7.0 blocker? cc @bbejeck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r539568378



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
                 break;
             }
 
+            RuntimeException fatalException = null;
+
             if (future.failed()) {
                 if (future.isRetriable()) {
                     log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
                     client.awaitMetadataUpdate(timer);
-                } else
-                    throw future.exception();
+                } else {
+                    log.info("FindCoordinator request hit fatal  exception", fatalException);
+                    fatalException = future.exception();
+                }
             } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
+
+            clearFindCoordinatorFuture();

Review comment:
       That makes sense, we can `clearFindCoordinatorFuture` inside the hb thread as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-741993693


   > And a restart of the client will get it out of the bad state
   
   This is a massive deal though, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chenhongluo edited a comment on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
chenhongluo edited a comment on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592


   We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea?
   
   There are code:
   
   protected synchronized RequestFuture<Void> lookupCoordinator() {
           if (findCoordinatorFuture == null) {
               // find a node to ask about the coordinator
               Node node = this.client.leastLoadedNode();
               if (node == null) {
                   // TODO: If there are no brokers left, perhaps we should use the bootstrap set
                   // from configuration?
                   log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
                   return RequestFuture.noBrokersAvailable();
               } else {
                   findCoordinatorFuture= sendGroupCoordinatorRequest(node);
               }
           }
           return findCoordinatorFuture;
       }
   
       private synchronized void clearFindCoordinatorFuture() {
           findCoordinatorFuture = null;
       }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-740339466


   Kicked off 30 versions of the system test which has seemed to be flaky due to this bug:
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4302/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rqode commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
rqode commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-858645412


   When you experience this issue on 2.6.0 consumers is it enough to only upgrade the kafka client to 2.6.2 or does this fix require a server upgrade? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-741977437


   @ijuma no, I don't think it should be a 2.7 blocker. It's definitely not a regression, AFAICT this has been around since the beginning. And a restart of the client will get it out of the bad state


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-770017240


   Cherrypicked to 2.6 as well


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r564899776



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1359,10 +1368,15 @@ public void run() {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || lookupCoordinator().failed())
-                                // the immediate future check ensures that we backoff properly in the case that no
-                                // brokers are available to connect to.
+                            if (findCoordinatorFuture != null) {

Review comment:
       I think the issue is spot-on! The logic here becomes a bit hard to understand for other readers now and I'd suggest update the cmment as:
   
   "Clear the future so that after the backoff in the next iteration, if hb still sees coordinator unknown it will try re-discover the coordinator in case the main thread cannot"
   
   Otherwise, LGTM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r560600078



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || lookupCoordinator().failed())
+                            if (findCoordinatorFuture != null || lookupCoordinator().failed()) {

Review comment:
       Sure yeah that's much better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9671:
URL: https://github.com/apache/kafka/pull/9671


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767808575


   Ugh, looks like the JDK 15 build still timed out. But I think this was probably environmental based on inspecting the output -- I also tracked down and verified that every run of the previously-hanging `#testCoordinatorFailover` test did complete (and pass) so that does seem to be fixed. Will retrigger the build to be safe


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chenhongluo commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
chenhongluo commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592


   We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea?
   
   There are code:
   protected synchronized RequestFuture<Void> lookupCoordinator() {
           if (findCoordinatorFuture == null) {
               // find a node to ask about the coordinator
               Node node = this.client.leastLoadedNode();
               if (node == null) {
                   // TODO: If there are no brokers left, perhaps we should use the bootstrap set
                   // from configuration?
                   log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
                   return RequestFuture.noBrokersAvailable();
               } else {
                   findCoordinatorFuture= sendGroupCoordinatorRequest(node);
               }
           }
           return findCoordinatorFuture;
       }
   
       private synchronized void clearFindCoordinatorFuture() {
           findCoordinatorFuture = null;
       }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r564902848



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1359,10 +1368,15 @@ public void run() {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture != null || lookupCoordinator().failed())
-                                // the immediate future check ensures that we backoff properly in the case that no
-                                // brokers are available to connect to.
+                            if (findCoordinatorFuture != null) {

Review comment:
       SG




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-736964914


   Waiting to add tests until I get some sanity checks on this proposal


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-747738686


   Let's aim for 2.7.1. :) 2.7.0 is done at this point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org