You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jaebin Yoon (Jira)" <ji...@apache.org> on 2020/12/09 03:28:00 UTC

[jira] [Comment Edited] (KAFKA-10827) Consumer group coordinator node never gets updated for manual partition assignment with infrequent requests

    [ https://issues.apache.org/jira/browse/KAFKA-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246273#comment-17246273 ] 

Jaebin Yoon edited comment on KAFKA-10827 at 12/9/20, 3:27 AM:
---------------------------------------------------------------

[~Jack-Lee] I don't think your patch would work for our case. The connection state is actually in failed state. the problem is more on client.connectionDelay() always returns 0.  


{code:java}
    public long connectionDelay(String id, long now) {
        NodeConnectionState state = nodeState.get(id);
        if (state == null) return 0;
        if (state.state.isDisconnected()) {
            long timeWaited = now - state.lastConnectAttemptMs;
            return Math.max(state.reconnectBackoffMs - timeWaited, 0);
{code}
timeWaited is always greater than state.reconnectBackoffMs in our case because of infrequent request (and that request timesout since it's sending to non-existing broker). so connectionDelay returns 0 making isUnavailable false.


was (Author: jaebinyo):
[~Jack-Lee] I don't think your patch would work for our case. The connection state is actually in failed state. the problem is more on client.connectionDelay() always returns 0.  


{code:java}
    public long connectionDelay(String id, long now) {
        NodeConnectionState state = nodeState.get(id);
        if (state == null) return 0;
        if (state.state.isDisconnected()) {
            long timeWaited = now - state.lastConnectAttemptMs;
            return Math.max(state.reconnectBackoffMs - timeWaited, 0);
{code}
timeWaited is always greater than state.reconnectBackoffMs in our case because of infrequent request. so connectionDelay returns 0 making isUnavailable false.

> Consumer group coordinator node never gets updated for manual partition assignment with infrequent requests
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10827
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10827
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.2.0
>            Reporter: Jaebin Yoon
>            Assignee: lqjacklee
>            Priority: Major
>         Attachments: KAFKA-10827.patch
>
>
> We've run into a situation where the coordinator node in the consumer never gets updated with the new coordinator when the coordinator broker gets replaced with a new instance. Once the consumer gets into this mode, the consumer keeps trying to connect to the old coordinator and never recovers unless restarted.
> This happens when the consumer uses manual partition assignment and commits offsets very infrequently (every 5 minutes) and the coordinator broker is not reachable (ip address, hostname are gone in a cloud environment).
> The exception the consumer keeps getting isĀ 
> {code:java}
> Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 120000 ms.
> {code}
> We could see a bunch of *SYN_SENT* tcp state from the consumer app to the old hostname in this error condition.
> In the current manual partition assignment scenario, the only way for the coordinator to gets updated is through checkAndGetCoordinator in AbstractCoordinator but this gets called only in committing offsets every 5 minutes in our case.  
> The current logic of checkAndGetCoordinator is using ConsumerNetworkClient.isUnavailable but it returns false unless the Network client is in reconnect backoff time, which is currently configured with default values (reconnect.backoff.ms (50), reconnect.backoff.max.ms (1000) while request.timeout.ms is 120000.  In this scenario, ConsumerNetworkClient.isUnavailable for the old coordinator node always returns false, resulting in checkAndGetCoordinator keeps the old coordinator node forever.
> What the consumer does essentially, in this case, is that it sends one Commit offsets request every 5 min to the coordinator. And that request times out and when the consumer calls checkAndGetCoordinator in 5 min, it returns old coordinator again since the last attempt was more than 3 min ago (with 2 min request.timeout.ms) and repeats this forever.
> The current implementation assumes that there are many requests to the coordinator (normally through heartbeat thread, etc) to detect new coordinator but with this infrequent request, it never gets out of old coordinator.
> We had to restart the consumer to recover from this condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)