You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2019/03/19 20:38:00 UTC

[jira] [Updated] (KAFKA-8062) StateListener is not notified when StreamThread dies

     [ https://issues.apache.org/jira/browse/KAFKA-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-8062:
-----------------------------------
    Fix Version/s: 2.2.1
                   2.3.0

> StateListener is not notified when StreamThread dies
> ----------------------------------------------------
>
>                 Key: KAFKA-8062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8062
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>         Environment: Kafka 2.1.1, kafka-streams-scala version 2.1.1
>            Reporter: Andrey Volkov
>            Assignee: Guozhang Wang
>            Priority: Minor
>             Fix For: 2.3.0, 2.2.1
>
>
> I want my application to react when streams die. Trying to use KafkaStreams.setStateListener. Also checking KafkaStreams.state() from time to time.
> The test scenario: Kafka is available, but there are no topics that my Topology is supposed to use.
> I expect streams to crash and the state listener to be notified about that, with the new state ERROR. KafkaStreams.state() should also return ERROR.
> In reality the streams crash, but the KafkaStreams.state() method always returns REBALANCING and the last time the StateListener was called, the new state was also REBALANCING. 
>  
> I believe the reason for this is in the methods:
> org.apache.kafka.streams.KafkaStreams.StreamStateListener.onChange() which does not react on the state StreamsThread.State.PENDING_SHUTDOWN
> and
> org.apache.kafka.streams.processor.internals.StreamThread.RebalanceListener.onPartitionsAssigned, which calls shutdown() setting the state to PENDING_SHUTDOWN and then
> streamThread.setStateListener(null) effectively removing the state listener, so that the DEAD state of the thread never reaches KafkaStreams object.
> Here is an extract from the logs:
> {{14:57:03.272 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] ERROR o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer] test-input-topic is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.}}
> {{14:57:03.283 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, groupId=Test] Successfully joined group with generation 1}}
> {{14:57:03.284 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, groupId=Test] Setting newly assigned partitions []}}
> {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Informed to shut down}}
> {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN}}
> {{14:57:03.316 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutting down}}
> {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.c.KafkaConsumer - [Consumer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions}}
> {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.c.p.KafkaProducer - [Producer clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.}}
> {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD}}
> {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutdown complete}}
> After this calls to KafkaStreams.state() still return REBALANCING
> There is a workaround with requesting KafkaStreams.localThreadsMetadata() and checking each thread's state manually, but that seems very wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)