You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Boyang Chen (Jira)" <ji...@apache.org> on 2020/05/17 18:06:00 UTC

[jira] [Created] (KAFKA-10010) Should close standby task for safety during HandleLostAll

Boyang Chen created KAFKA-10010:
-----------------------------------

             Summary: Should close standby task for safety during HandleLostAll
                 Key: KAFKA-10010
                 URL: https://issues.apache.org/jira/browse/KAFKA-10010
             Project: Kafka
          Issue Type: Bug
            Reporter: Boyang Chen
            Assignee: Boyang Chen


The current lost all logic doesn't close standby task, which could potentially lead to a tricky condition like below:



1. The standby task was initializing as `CREATED` state, and task corrupted exception was thrown from registerStateStores

2. The task corrupted exception was caught, and do a non-affected task commit

3. The task commit failed due to task migrated exception

4. The handleLostAll didn't close the standby task, leaving it as CREATED state

5. Next rebalance complete, the same task was assigned back as standby task.

6. Illegal Argument exception caught :
{code:java}
[2020-05-16T11:56:18-07:00] (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 18:56:18,050] ERROR [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] stream-thread [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-16T11:56:18-07:00] (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) java.lang.IllegalArgumentException: stream-thread [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-0000000007 has already been registered.
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
        at org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
        at org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)