You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/07/11 18:48:00 UTC

[jira] [Resolved] (KAFKA-10247) Streams may attempt to process after closing a task

     [ https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax resolved KAFKA-10247.
-------------------------------------
    Fix Version/s: 2.6.0
       Resolution: Fixed

> Streams may attempt to process after closing a task
> ---------------------------------------------------
>
>                 Key: KAFKA-10247
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10247
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Blocker
>             Fix For: 2.6.0
>
>
> Observed in a system test. A corrupted task was detected, and Stream properly closed it as dirty:
> {code:java}
> [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
>    at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>    at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be re-initialized
>    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
>    at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>    at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>    ... 3 more
> [2020-07-08 17:08:09,346] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-07-08 17:08:09,346] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing its state manager and all the registered state stores: {sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 : SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null, cntStoreName=StateStoreMetadata (cntStoreName : SmokeTest-cntStoreName-changelog-1 @ 0} (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> [2020-07-08 17:08:09,346] INFO [Consumer clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer, groupId=null] Subscribed to partition(s): SmokeTest-minStoreName-changelog-1, SmokeTest-minStoreName-changelog-2, SmokeTest-sum-STATE-STORE-0000000050-changelog-0, SmokeTest-minStoreName-changelog-3, SmokeTest-sum-STATE-STORE-0000000050-changelog-2, SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0, SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2, SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4 (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2020-07-08 17:08:09,348] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released state dir lock for task 2_1 (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2020-07-08 17:08:09,348] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing record collector dirty (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> [2020-07-08 17:08:09,348] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask){code}
> However, there were already records buffered for it, so later on in the same processing loop, Streams tried to process that task, resulting in an IllegalStateException:
> {code:java}
> [2020-07-08 17:08:09,352] ERROR stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to process stream task 2_1 due to the following error: (org.apache.kafka.streams.processor.internals.TaskManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName is currently closed.
>    at org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
>    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
>    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
>    at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
>    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
>    at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
>    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
>    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
>    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
>    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
>    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,352] ERROR stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName is currently closed.
>    at org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
>    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
>    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
>    at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
>    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
>    at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
>    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
>    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
>    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
>    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
>    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
>    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,352] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-07-08 17:08:09,352] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread){code}
> Which caused the entire thread to shut down.
>  
> Instead, we should not attempt to process tasks that are not running.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)