You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/05/18 17:41:00 UTC

[jira] [Resolved] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

     [ https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

John Roesler resolved KAFKA-9994.
---------------------------------
    Resolution: Fixed

> Catch TaskMigrated exception in task corruption code path 
> ----------------------------------------------------------
>
>                 Key: KAFKA-9994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> We have seen a case where the TaskMigrated exception gets thrown from taskManager.commit(). This should be prevented by proper catching.
> Looking at the stack trace, the TaskMigrated was thrown from preCommit() call inside corrupted task exception commit.
> {code:java}
> [2020-05-14T05:47:25-07:00] (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 12:47:25,635] ERROR [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] stream-thread [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-14T05:47:25-07:00] (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced trying to send a record [stream-thread [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task [1_1]]; it means all tasks belonging to this thread should be migrated.
>         at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216)
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>         at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>         at org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36)
>         at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112)
>         at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
>         at org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111)
>         at org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91)
>         at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>         at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>         at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
>         at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296)
>         at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
>         at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200)
>         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200)
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402)
>         at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317)
>         at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)