You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "amuthan Ganeshan (Jira)" <ji...@apache.org> on 2019/10/22 18:20:00 UTC

[jira] [Commented] (KAFKA-7181) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters IllegalStateException

    [ https://issues.apache.org/jira/browse/KAFKA-7181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16957264#comment-16957264 ] 

amuthan Ganeshan commented on KAFKA-7181:
-----------------------------------------

h3. *I am using the latest version of Kafka stream 2.3.0 but this bug still exists, could you please help me fix this, following is the scenario for you to review.*

 

I have a Kafka stream application that stores the incoming messages into a state store, and later during the punctuation period, we store them into a big data persistent store after processing the messages.

The application consumes from 120 partitions distributed across 40 instances. The application has been running fine without any problem for months, but all of a sudden some of the instances failed because of a stream thread exception saying  

```java.lang.IllegalStateException: No current assignment for partition <app_name>-<store_name>-changelog-98```

 

And other instances are stuck in the REBALANCING state, and never comes out of it. Here is the full stack trace, I just masked the application-specific app name and store name in the stack trace due to NDA.

 

```

2019-10-21 13:27:13,481 ERROR [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] Encountered the following error during processing:
java.lang.IllegalStateException: No current assignment for partition application.id-store_name-changelog-98
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
at org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
at org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

```

 

Now I checked the state sore disk usage; it is less than 40% of the total disk space available. Restarting the application solves the problem for a short amount of time, but the error popping up randomly on some other instances quickly. I tried to change the retry and retry.backoff.ms configuration but not helpful at all

```

retries = 2147483647

retry.backoff.ms

```

After googling for some time I found there was a similar bug reported to the Kafka team in the past, and also notice my stack trace is exactly matching with the stack trace of the reported bug.

Here is the link for the bug reported on a comparable basis a year ago.

https://issues.apache.org/jira/browse/KAFKA-7181

 

Now I am wondering is there a workaround for this bug though configuration changes, or is there something wrong the way I set up the application, the following are the configuration I have for my stream application.

 

```

consumer.session.timeout.ms=30000
metric.reporters=org.apache.kafka.common.metrics.JmxReporter
replication.factor=3
metadata.max.age.ms=30000
max.partition.fetch.bytes=2000000
producer.retries=2147483647
bootstrap.servers= <bootstrap server list goes here>
metrics.recording.level=DEBUG
producer.retry.backoff.ms=60000
consumer.auto.offset.reset=latest
application.server=0.0.0.0:6063
num.standby.replicas=1
max.poll.records=2
group.initial.rebalance.delay.ms=30000
state.dir= <state dir path goes here>
heartbeat.interval.ms=10000
max.poll.interval.ms=300000
num.stream.threads=10
application.id= <application id goes here>

```

Note: The original bug reported a year back got a conclusion that it is related to https://issues.apache.org/jira/browse/KAFKA-7657 and reported solved in version 2.2.0, but I am using the latest 2.3.0 version.

I appreciate your help concerning this bug.

> Kafka Streams State stuck in rebalancing after one of the StreamThread encounters IllegalStateException
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7181
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7181
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Romil Kumar Vasani
>            Priority: Major
>             Fix For: 2.2.0
>
>
> One the StreamThread encounters an IllegalStateException and is marked DEAD, shut down.
> The application doesn't spawn a new thread in it's place, the partitions of that thread are assigned to a different thread and it synchronizes. But the application is stuck in REBALANCING state, as not all StreamThreads are in RUNNING state.
> Excepted: New thread should come up and after synchronization/rebalancing it the KafkaStream.State should be RUNNING
> Since all the active threads (that are not marked DEAD) are in RUNNING state, the KafkaStreams.State should be RUNNING
> P.S. I am reporting an issue for the first time. If there is more information needed I can provide.
> Below are the logs from the IllegalStateException: 
> 2018-07-18 03:02:27.510 ERROR 1 — [-StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [prd1565.prod.nuke.ops.v1-StreamThread-2] Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition consumerGroup-stateStore-changelog-10
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413)
>  at org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>  at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040)
>  at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812)
>  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
> 2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [consumerGroup-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN
>  2018-07-18 03:02:27.511 INFO 1 — [-StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [consumerGroup-StreamThread-2] Shutting down
>  2018-07-18 03:02:27.571 INFO 1 — [-StreamThread-2] o.a.k.clients.producer.KafkaProducer : [Producer clientId=consumerGroup-StreamThread-2-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [consumerGroup-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD
>  2018-07-18 03:02:27.579 INFO 1 — [-StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [consumerGroup-StreamThread-2] Shutdown complete
>  2018-07-18 03:02:27.579 ERROR 1 — [-StreamThread-2] xxx.xxx.xxx.AppRunner : Unhandled exception in thread: 43:consumerGroup-StreamThread-2
> java.lang.IllegalStateException: No current assignment for partition consumerGroup-inventoryStore-changelog-10
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:413)
>  at org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:595)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:553)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>  at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1040)
>  at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:812)
>  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)