You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tom Lee (Jira)" <ji...@apache.org> on 2019/10/14 04:09:00 UTC

[jira] [Comment Edited] (KAFKA-8950) KafkaConsumer stops fetching

    [ https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16950702#comment-16950702 ] 

Tom Lee edited comment on KAFKA-8950 at 10/14/19 4:08 AM:
----------------------------------------------------------

Hm not sure I see what you mean. I think you're right that checkDisconnects could be called by the coordinator/heartbeat thread at the same time send is invoked by the fetcher (edit: or the other way around!), so no disagreement there. But RequestFuture.addListener() will enqueue the listener in a ConcurrentLinkedQueue, then check if the future was previously succeeded or failed by checking an atomic reference before invoking fire\{Success,Failure\} which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread will invoke the listener because the enqueue "happened-before" the atomic reference write. On the other hand if the atomic reference write "happened-before" we check if the future is succeeded or failed, addListener will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the complete()/raise() somewhere, but I'd be happy to be wrong if we could get it fixed. :)



was (Author: thomaslee):
Hm not sure I see what you mean. I think you're right that checkDisconnects could be called by the coordinator/heartbeat thread at the same time send is invoked by the fetcher, so no disagreement there. But RequestFuture.addListener() will enqueue the listener in a ConcurrentLinkedQueue, then check if the future was previously succeeded or failed by checking an atomic reference before invoking fire{Success,Failure} which will then invoke the enqueued listener.

So say we enqueue the listener then we "see" that the future is neither succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread will invoke the listener because the enqueue "happened-before" the atomic reference write. On the other hand if the atomic reference write "happened-before" we check if the future is succeeded or failed, addListener will execute fireSuccess/fireFailure which will invoke the listener logic.

It's certainly delicate, but I don't think it's incorrect.

I personally think the issue is that we're somehow entirely missing the complete()/raise() somewhere, but I'd be happy to be wrong if we could get it fixed. :)


> KafkaConsumer stops fetching
> ----------------------------
>
>                 Key: KAFKA-8950
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8950
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.3.0
>         Environment: linux
>            Reporter: Will James
>            Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no records from the broker with every poll, and from most of the Kafka metrics in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is zero lag. In addition, we see that the heartbeat rate stays unchanged from before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to see that the consumer is committing the same offset on the auto commit interval, however, the offset does not move, and the lag from the broker's perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the fetch rate, and see that the samples were not updated for a long period of time (actually, precisely the amount of time that the problem in our application was occurring, around 50 hours - we have alerting on other metrics but not the fetch rate, so we didn't notice the problem until a customer complained).
> In this example, the consumer was processing around 40 messages per second, with an average size of about 10kb, although most of the other examples of this have happened with higher volume (250 messages / second, around 23kb per message on average).
> I have spent some time investigating the issue on our end, and will continue to do so as time allows, however I wanted to raise this as an issue because it may be affecting other people.
> Please let me know if you have any questions or need additional information. I doubt I can provide heap dumps unfortunately, but I can provide further information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)