You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Stanislav Kozlovski (JIRA)" <ji...@apache.org> on 2019/02/28 12:45:00 UTC

[jira] [Updated] (KAFKA-8016) Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group

     [ https://issues.apache.org/jira/browse/KAFKA-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stanislav Kozlovski updated KAFKA-8016:
---------------------------------------
    Description: 
I have seen the following client exception after a consumer group rebalance:
{code:java}
INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
INFO  Fetcher  Resetting offset for partition _ to offset 32108462.

java.lang.IllegalStateException: No current assignment for partition X
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
The logs also had this message in a close timeframe:
{code:java}
INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
 

After investigating, I see that there might be a race condition:[
|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]|

[Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to handle the response here: [1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]] 
|
 
This happens when the either `Consumer#position()` or  `Consumer#poll()` gets called.
 The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException|

  was:
I have seen the following client exception after a consumer group rebalance:
{code:java}
INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
INFO  Fetcher  Resetting offset for partition _ to offset 32108462.

java.lang.IllegalStateException: No current assignment for partition X
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
The logs also had this message in a close timeframe:
{code:java}
INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
 

After investigating, I see that there might be a race condition:[

|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]

[Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to handle the response here: [1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2
|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]This happens when the either `Consumer#position()` or  `Consumer#poll()` gets called.
The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException


> Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8016
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8016
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Major
>
> I have seen the following client exception after a consumer group rebalance:
> {code:java}
> INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
> INFO  Fetcher  Resetting offset for partition _ to offset 32108462.
> java.lang.IllegalStateException: No current assignment for partition X
> at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
> at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
> at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
> at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
> at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
> at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
> at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
> at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
> at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
> at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
> {code}
> The logs also had this message in a close timeframe:
> {code:java}
> INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
>  
> After investigating, I see that there might be a race condition:[
> |https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]|
> [Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to handle the response here: [1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]] 
> |
>  
> This happens when the either `Consumer#position()` or  `Consumer#poll()` gets called.
>  The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)