You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Stanislav Kozlovski (JIRA)" <ji...@apache.org> on 2019/02/28 12:45:00 UTC

[jira] [Created] (KAFKA-8016) Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group

Stanislav Kozlovski created KAFKA-8016:
------------------------------------------

             Summary: Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group
                 Key: KAFKA-8016
                 URL: https://issues.apache.org/jira/browse/KAFKA-8016
             Project: Kafka
          Issue Type: Improvement
            Reporter: Stanislav Kozlovski
            Assignee: Stanislav Kozlovski


I have seen the following client exception after a consumer group rebalance:
{code:java}
INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
INFO  Fetcher  Resetting offset for partition _ to offset 32108462.

java.lang.IllegalStateException: No current assignment for partition X
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
The logs also had this message in a close timeframe:
{code:java}
INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
 

After investigating, I see that there might be a race condition:[

|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]

[Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to handle the response here: [1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2
|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]This happens when the either `Consumer#position()` or  `Consumer#poll()` gets called.
The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)