You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xie Yi (Jira)" <ji...@apache.org> on 2022/10/31 12:55:00 UTC

[jira] [Updated] (FLINK-29816) Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception

     [ https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xie Yi updated FLINK-29816:
---------------------------
    Description: 
h4. 1. How to repeat 

ProcessWindowFunction, and make some exception in process()
test code
{code:java}
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(60 * 1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
                .setBootstrapServers("****")
                .setTopics("****")
                .setGroupId("****")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();

        DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");

        SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
                        //when process event:"abc" .It causes java.lang.NumberFormatException
                        Integer intS = Integer.valueOf(s);
                        collector.collect(s);
                    }
                })
                .name("name-process").uid("uid-process");

        mapSourse.print();
        env.execute();
    }
{code}
kafka input event
{code:java}
>1
>1
>2
>2
>3
>3
>abc
>abc
>
{code}
h4. 2. fault phenomena

when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously.
However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception
!image-2022-10-31-19-54-12-546.png!
h4. 3. possible reasons

attempt 2 was restore from checkpoint
{code:java}
2022-10-31 17:00:30,033 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
{code}
*the stack trace in third attempt*
user function was called in SteamTask.restore(subtask state is INITIALIZING)
{code:java}
java.lang.Thread.getStackTrace(Thread.java:1552)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
java.lang.Thread.run(Thread.java:745)
{code}
stack trace(which cause failover) in attempt 0 and attempt 1
user function was called in SteamTask.invoke
{code:java}
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
java.lang.Thread.run(Thread.java:745)
{code}
in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException
SteamTask only handleAsyncException when is Running==true
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540]
{code:java}
    @Override
    public void handleAsyncException(String message, Throwable exception) {
        if (isRunning) {
            // only fail if the task is still running
            asyncExceptionHandler.handleAsyncException(message, exception);
        }
    }
{code}
but during restore,isRunning==false
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673]

 

So during Steam.restore, SteamTask skip exception in userfunction of ProcessWindowFunction.

 

 
h4. h4. 

 

  was:
h4. 1. How to repeat 

ProcessWindowFunction, and make some exception in process()
test code
{code:java}
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(60 * 1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
                .setBootstrapServers("****")
                .setTopics("****")
                .setGroupId("****")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();

        DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");

        SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
                        //when process event:"abc" .It causes java.lang.NumberFormatException
                        Integer intS = Integer.valueOf(s);
                        collector.collect(s);
                    }
                })
                .name("name-process").uid("uid-process");

        mapSourse.print();
        env.execute();
    }
{code}
kafka input event
{code:java}
>1
>1
>2
>2
>3
>3
>abc
>abc
>
{code}
h4. 2. fault phenomena

when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously.
However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception
!image-2022-10-31-19-54-12-546.png!
h4. 3. possible reasons

attempt 2 was restore from checkpoint
{code:java}
2022-10-31 17:00:30,033 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
{code}
*the stack trace in third attempt*
user function was called in SteamTask.restore(subtask state is INITIALIZING)
{code:java}
java.lang.Thread.getStackTrace(Thread.java:1552)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
java.lang.Thread.run(Thread.java:745)
{code}
stack trace(which cause failover) in attempt 0 and attempt 1
user function was called in SteamTask.invoke
{code:java}
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
java.lang.Thread.run(Thread.java:745)
{code}
in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException
SteamTask only handleAsyncException when is Running==true
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540]
{code:java}
    @Override
    public void handleAsyncException(String message, Throwable exception) {
        if (isRunning) {
            // only fail if the task is still running
            asyncExceptionHandler.handleAsyncException(message, exception);
        }
    }
{code}
but during restore,isRunning==false
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673]

 

So during Steam.restore, SteamTask skip exception in userfunction of ProcessWindowFunction.

 


> Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29816
>                 URL: https://issues.apache.org/jira/browse/FLINK-29816
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.16.0, 1.15.2
>            Reporter: Xie Yi
>            Priority: Major
>         Attachments: image-2022-10-31-19-49-52-432.png, image-2022-10-31-19-54-12-546.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         env.enableCheckpointing(60 * 1000);
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>         KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
>                 .setBootstrapServers("****")
>                 .setTopics("****")
>                 .setGroupId("****")
>                 .setValueOnlyDeserializer(new SimpleStringSchema())
>                 .setStartingOffsets(OffsetsInitializer.earliest())
>                 .build();
>         DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");
>         SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
>                 .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
>                     @Override
>                     public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
>                         //when process event:"abc" .It causes java.lang.NumberFormatException
>                         Integer intS = Integer.valueOf(s);
>                         collector.collect(s);
>                     }
>                 })
>                 .name("name-process").uid("uid-process");
>         mapSourse.print();
>         env.execute();
>     }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> h4. 3. possible reasons
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
> *the stack trace in third attempt*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> java.lang.Thread.run(Thread.java:745)
> {code}
> stack trace(which cause failover) in attempt 0 and attempt 1
> user function was called in SteamTask.invoke
> {code:java}
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> java.lang.Thread.run(Thread.java:745)
> {code}
> in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException
> SteamTask only handleAsyncException when is Running==true
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540]
> {code:java}
>     @Override
>     public void handleAsyncException(String message, Throwable exception) {
>         if (isRunning) {
>             // only fail if the task is still running
>             asyncExceptionHandler.handleAsyncException(message, exception);
>         }
>     }
> {code}
> but during restore,isRunning==false
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673]
>  
> So during Steam.restore, SteamTask skip exception in userfunction of ProcessWindowFunction.
>  
>  
> h4. h4. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)