You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/02 02:43:18 UTC
[GitHub] [kafka] ableegoldman opened a new pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
ableegoldman opened a new pull request #9671:
URL: https://github.com/apache/kafka/pull/9671
A race condition between the consumer and hb thread can lead to a failed but non-null `findCoordinatorFuture`, causing the AbstractCoordinator to wait endlessly on the request which it thinks is still in flight. We should move the handling of this future out of the listener callbacks and into the `ensureCoordinatorReady()` method where we can check the exception and clear the future all in one place.
See ticket for full analysis.
Also starts logging a warning if the consumer is unable to connect to the coordinator for longer than the max poll interval.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-747732868
@ijuma sorry, I missed your earlier response. It's definitely not a trivial bug, yes. The main reasons I didn't formally propose this as a blocker was that it's been around forever, and I'm not confident that the fix is low-risk. WDYT @bbejeck ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] chenhongluo edited a comment on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
chenhongluo edited a comment on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592
We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea?
There are code:
```
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
// from configuration?
log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture= sendGroupCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
private synchronized void clearFindCoordinatorFuture() {
findCoordinatorFuture = null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r534377676
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
- clearFindCoordinatorFuture();
+ log.debug("FindCoordinator request failed", e);
Review comment:
nit: I think it's better to just print the e.message in a single line.
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -235,11 +235,6 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
return true;
do {
- if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {
Review comment:
The main reason for https://github.com/apache/kafka/pull/7312/files#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R230-R234 is the following:
* inside ensureCoordinatorReady called by the main thread, we may break out of the loop at line 248 below, without knowing what's the final state of the future.
* and that future could be completed by the other thread (hb) later, and replaced by a new `future` object.
In that case, when the main thread calls ensureCoordinatorReady again, it will "miss" the previous future's contained fatal error.
So thinking about it again, I think we would still want to maintain the exception but only if it is a fatal one inside the handler (i.e. we do not probably need to register another listener just to bookkeep that exception, but just piggy-back this logic inside the handler listener directly), and then inside the while loop, we check if a previous future already gets a fatal exception and if yes, throw it to fail the whole client.
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
break;
}
+ RuntimeException fatalException = null;
+
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
client.awaitMetadataUpdate(timer);
- } else
- throw future.exception();
+ } else {
+ log.info("FindCoordinator request hit fatal exception", fatalException);
+ fatalException = future.exception();
+ }
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
+
+ clearFindCoordinatorFuture();
Review comment:
I think this is a better approach, but we need to be careful about the callee inside hb thread:
```
if (findCoordinatorFuture != null || lookupCoordinator().failed())
```
i.e. a hb thread sending a discover-coordinator request would also cause a future to be assigned, but that future would only be cleared by the main thread caller. Thinking about that for a sec I think this is okay, but maybe worth having a second pair of eyes over it.
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
break;
}
+ RuntimeException fatalException = null;
+
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
client.awaitMetadataUpdate(timer);
- } else
- throw future.exception();
+ } else {
+ log.info("FindCoordinator request hit fatal exception", fatalException);
Review comment:
nit: extra space.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387
> Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it
Hmm... I'm not familiar with SaslXConsumerTest either...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r558623815
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null || lookupCoordinator().failed())
+ if (findCoordinatorFuture != null || lookupCoordinator().failed()) {
Review comment:
The nested condition is a bit awkward, how about this:
```
if (findCoordinatorFuture != null) {
// if it has failed, clear it so that hb thread can try discover again in the next loop in case main thread is busy
if (findCoordinatorFuture.failed()) {
clearFindCoordinatorFuture();
}
// backoff properly
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
lookupCoordinator();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] chenhongluo edited a comment on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
chenhongluo edited a comment on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592
We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea? @ableegoldman @guozhangwang
There are code:
```
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
// from configuration?
log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture= sendGroupCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
private synchronized void clearFindCoordinatorFuture() {
findCoordinatorFuture = null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764063387
> Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it
Hmm... I'm not familiar with SaslXConsumerTest either...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r539567098
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
- clearFindCoordinatorFuture();
+ log.debug("FindCoordinator request failed", e);
Review comment:
if you just have
```
log.debug("FindCoordinator request failed due to {}", e)
```
Then e.toString would be called which would usually be `e.name(): e.getMessage()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537990731
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
break;
}
+ RuntimeException fatalException = null;
+
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
client.awaitMetadataUpdate(timer);
- } else
- throw future.exception();
+ } else {
+ log.info("FindCoordinator request hit fatal exception", fatalException);
+ fatalException = future.exception();
+ }
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
+
+ clearFindCoordinatorFuture();
Review comment:
Ah, good point...actually I think that's probably not ok for it to only ever be cleared in the main thread, since eg the main thread might be stuck in long processing while the hb threads should not be blocked from looking up the coordinator.
So, maybe we should also call `clearFindCoordinatorFuture` inside the hb thread in the `if (findCoordinatorFuture != null || lookupCoordinator().failed())` block (if it did indeed finish and has failed) -- WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537986486
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -853,7 +844,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
- clearFindCoordinatorFuture();
+ log.debug("FindCoordinator request failed", e);
Review comment:
You mean like
```
log.debug("FindCoordinator request failed due to {}", e.getMessage());
```
?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537992012
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -235,11 +235,6 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
return true;
do {
- if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {
Review comment:
Ok yeah that makes sense. Thanks
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767988592
Merged to trunk and cherrypicked to 2.7
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539
Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-764053539
Weird -- these changes seem to be causing the `SaslXConsumerTest` family of tests to hang. I'm not very (or at all) familiar with these tests so I haven't found anything yet but I'm actively looking into it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r537990731
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
break;
}
+ RuntimeException fatalException = null;
+
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
client.awaitMetadataUpdate(timer);
- } else
- throw future.exception();
+ } else {
+ log.info("FindCoordinator request hit fatal exception", fatalException);
+ fatalException = future.exception();
+ }
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
+
+ clearFindCoordinatorFuture();
Review comment:
Ah, good point...actually I think that's probably not ok for it to only ever be cleared in the main thread, since eg the main thread might be stuck in long processing while the hb threads should not be blocked from looking up the coordinator.
So, maybe we should also call `clearFindCoordinatorFuture` inside the hb thread in the `if (findCoordinatorFuture != null || lookupCoordinator().failed())` block
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-747748917
> that it's been around forever, and I'm not confident that the fix is low-risk.
Sorry I didn't see this before.
I'd agree that it is not a blocker for reasons outlined by @ableegoldman. I'd like to see this get in, but it seems we'd like a little more time for this patch to soak.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r558623815
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null || lookupCoordinator().failed())
+ if (findCoordinatorFuture != null || lookupCoordinator().failed()) {
Review comment:
The nested condition is a bit awkward, how about this:
```
// try to find coordinator once if we have not yet done so; otherwise backoff properly
if (findCoordinatorFuture != null) {
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
lookupCoordinator();
}
// now findCoordinatorFuture should not be null;
// if it has failed, clear it so that hb thread can try discover again in the next loop in case main thread is busy
if (findCoordinatorFuture.failed()) {
clearFindCoordinatorFuture();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767983217
A few tests failed, but no hanging this time:
```
StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores -- known to be flaky
FetcherTest.testEarlierOffsetResetArrivesLate -- hit "TimeoutException: testEarlierOffsetResetArrivesLate() timed out after 10 seconds", I haven't seen this fail before, on this PR or on any other, so I believe it's unrelated. But I ran it 10 times locally to be sure and all passed
MirrorConnectorsIntegrationSSLTest.testReplication -- in Connect, seems to be unrelated
StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores -- known flaky, looks environmental (slow startup)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-769616246
Awesome!!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-859044497
It should be sufficient to upgrade just the consumers, this is a client-side fix only
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r558623815
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null || lookupCoordinator().failed())
+ if (findCoordinatorFuture != null || lookupCoordinator().failed()) {
Review comment:
The nested condition is a bit awkward, how about this:
```
if (findCoordinatorFuture != null) {
// if it has failed, clear it so that hb thread can try discover again in the next loop in case main thread is busy
if (findCoordinatorFuture.failed()) {
clearFindCoordinatorFuture();
} else {
// otherwise backoff properly
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
}
} else {
lookupCoordinator();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767278329
Ok I think I've gotten to the bottom of this hanging test, and pushed a fix. Tests seem to be passing reliably for me locally. Aiming to get this merged in the next day or so so let me know if you have any concerns around the latest @guozhangwang
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-741798735
Is this a 2.7.0 blocker? cc @bbejeck
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r539568378
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -248,18 +243,26 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
break;
}
+ RuntimeException fatalException = null;
+
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
client.awaitMetadataUpdate(timer);
- } else
- throw future.exception();
+ } else {
+ log.info("FindCoordinator request hit fatal exception", fatalException);
+ fatalException = future.exception();
+ }
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
+
+ clearFindCoordinatorFuture();
Review comment:
That makes sense, we can `clearFindCoordinatorFuture` inside the hb thread as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-741993693
> And a restart of the client will get it out of the bad state
This is a massive deal though, right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] chenhongluo edited a comment on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
chenhongluo edited a comment on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592
We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea?
There are code:
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
// from configuration?
log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture= sendGroupCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
private synchronized void clearFindCoordinatorFuture() {
findCoordinatorFuture = null;
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-740339466
Kicked off 30 versions of the system test which has seemed to be flaky due to this bug:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4302/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] rqode commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
rqode commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-858645412
When you experience this issue on 2.6.0 consumers is it enough to only upgrade the kafka client to 2.6.2 or does this fix require a server upgrade? Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-741977437
@ijuma no, I don't think it should be a 2.7 blocker. It's definitely not a regression, AFAICT this has been around since the beginning. And a restart of the client will get it out of the bad state
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-770017240
Cherrypicked to 2.6 as well
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r564899776
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1359,10 +1368,15 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null || lookupCoordinator().failed())
- // the immediate future check ensures that we backoff properly in the case that no
- // brokers are available to connect to.
+ if (findCoordinatorFuture != null) {
Review comment:
I think the issue is spot-on! The logic here becomes a bit hard to understand for other readers now and I'd suggest update the cmment as:
"Clear the future so that after the backoff in the next iteration, if hb still sees coordinator unknown it will try re-discover the coordinator in case the main thread cannot"
Otherwise, LGTM.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r560600078
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1342,10 +1351,17 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null || lookupCoordinator().failed())
+ if (findCoordinatorFuture != null || lookupCoordinator().failed()) {
Review comment:
Sure yeah that's much better
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9671:
URL: https://github.com/apache/kafka/pull/9671
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-767808575
Ugh, looks like the JDK 15 build still timed out. But I think this was probably environmental based on inspecting the output -- I also tracked down and verified that every run of the previously-hanging `#testCoordinatorFailover` test did complete (and pass) so that does seem to be fixed. Will retrigger the build to be safe
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] chenhongluo commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
chenhongluo commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-1018210592
We had the same problem with the 0.10.2 client. The findCoordinatorFuture was not being cleared, which resulted in a ”coordinator not available“ error for every commitOffsetAsync call. We use consumer.assign() to consume. What I can't understand is that both lookupCoordinator() and clearFindCoordinatorFuture() use synchronized which means clearFindCoordinatorFuture() only can execute after lookupCoordinator() are completed. Now I want to reproduce this bug, is there any good idea?
There are code:
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
// from configuration?
log.warn("No broker available to send GroupCoordinator request for group {}", groupId);
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture= sendGroupCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
private synchronized void clearFindCoordinatorFuture() {
findCoordinatorFuture = null;
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#discussion_r564902848
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1359,10 +1368,15 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture != null || lookupCoordinator().failed())
- // the immediate future check ensures that we backoff properly in the case that no
- // brokers are available to connect to.
+ if (findCoordinatorFuture != null) {
Review comment:
SG
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-736964914
Waiting to add tests until I get some sanity checks on this proposal
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-747738686
Let's aim for 2.7.1. :) 2.7.0 is done at this point.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org