You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Stanislav Kozlovski (JIRA)" <ji...@apache.org> on 2019/02/28 16:47:00 UTC

[jira] [Resolved] (KAFKA-8016) Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group

     [ https://issues.apache.org/jira/browse/KAFKA-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stanislav Kozlovski resolved KAFKA-8016.
----------------------------------------
    Resolution: Duplicate

This is a symptom of https://issues.apache.org/jira/browse/KAFKA-7831

> Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8016
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8016
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Major
>
> I think the consumer heartbeat thread has a possibility for a race condition that can crash it.
> I have seen the following client exception after a consumer group rebalance:
> {code:java}
> INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
> INFO  Fetcher  Resetting offset for partition _ to offset 32108462.
> java.lang.IllegalStateException: No current assignment for partition X
> at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
> at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
> at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
> at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
> at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
> at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
> at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
> at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
> at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
> at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
> {code}
> The logs also had this message in a close timeframe:
> {code:java}
> INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
>  
> After investigating, I see that there might be a race condition:
>  
>  [Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to initiate the code that handles the response in its run loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247])
>  
> updateFetchPositions() is called from the public methods `Consumer#position()` and  `Consumer#poll()`.
> The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)