You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2023/03/15 10:38:00 UTC

[jira] [Assigned] (FLINK-31414) exceptions in the alignment timer are ignored

     [ https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser reassigned FLINK-31414:
--------------------------------------

    Assignee: Feifan Wang

> exceptions in the alignment timer are ignored
> ---------------------------------------------
>
>                 Key: FLINK-31414
>                 URL: https://issues.apache.org/jira/browse/FLINK-31414
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.6, 1.14.6, 1.15.3, 1.16.1
>            Reporter: Feifan Wang
>            Assignee: Feifan Wang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Alignment timer task in alternating aligned checkpoint run as a future task in mailbox thread, causing the exceptions ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327]) to be ignored. These exceptions should have failed the task, but now this will cause the same checkpoint to fire twice initInputsCheckpoints in my test.
>  
> {code:java}
>  switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: unable to send request to worker
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
>         at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
>         at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
>         at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
>         at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
>         at org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
>         at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
>         at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>         at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>         at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>         at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>         at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>         at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>         at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>         at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>         at java.lang.Thread.run(Thread.java:748)
>         Suppressed: java.io.IOException: java.lang.IllegalStateException: writer not found for request start 17
>                 at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
>                 at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
>                 at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
>                 at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551)
>                 at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
>                 at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
>                 at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
>                 at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
>                 at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>                 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>                 ... 3 more
>         Caused by: java.lang.IllegalStateException: writer not found for request start 17
>                 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>                 at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75)
>                 at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62)
>                 at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
>                 at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
>                 ... 1 more
> Caused by: java.lang.IllegalStateException: not running
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244)
>         ... 27 more
>         [CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found for request start 17] {code}
>  
>  
> see : [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)