You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bastien DINE (Jira)" <ji...@apache.org> on 2022/10/04 07:26:00 UTC

[jira] [Comment Edited] (FLINK-25920) Allow receiving updates of CommittableSummary

    [ https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612499#comment-17612499 ] 

Bastien DINE edited comment on FLINK-25920 at 10/4/22 7:25 AM:
---------------------------------------------------------------

I encountered the same issue, but with aligned checkpoint : 

My job is stuck, how can I fix this error ?

Checkpointing conf :
||*Option*||*Value*||
|Checkpointing Mode|Exactly Once|
|Checkpoint Storage|FileSystemCheckpointStorage|
|State Backend|EmbeddedRocksDBStateBackend|
|Interval|10s|
|Timeout|10m 0s|
|Minimum Pause Between Checkpoints|10s|
|Maximum Concurrent Checkpoints|1|
|Unaligned Checkpoints|Disabled|
|Persist Checkpoints Externally|Enabled (retain on cancellation)|
|Tolerable Failed Checkpoints|0|
|Checkpoints With Finished Tasks|Enabled|

 

Exception :
java.io.IOException: Could not perform checkpoint 575427 for operator Operator
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:224)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:204)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
    ... 22 more
Caused by: java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920
    at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 35 more


was (Author: bdine):
I encountered the same issue, but with aligned checkpoint : 

My job is stuck, how can I fix this error ?

Checkpointing conf :
||*Option*||*Value*||
|Checkpointing Mode|Exactly Once|
|Checkpoint Storage|FileSystemCheckpointStorage|
|State Backend|EmbeddedRocksDBStateBackend|
|Interval|10s|
|Timeout|10m 0s|
|Minimum Pause Between Checkpoints|10s|
|Maximum Concurrent Checkpoints|1|
|Unaligned Checkpoints|Disabled|
|Persist Checkpoints Externally|Enabled (retain on cancellation)|
|Tolerable Failed Checkpoints|0|
|Checkpoints With Finished Tasks|Enabled|

 

Exception :
java.io.IOException: Could not perform checkpoint 575427 for operator Calcul Etats Alimentation -> (Sink: Sink Neo4J, Transformation des états d'alimentation en notifications -> Kafka Notifications Coupures Sink: Writer -> Kafka Notifications Coupures Sink: Committer) (1/1)#8607.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emit(SinkWriterOperator.java:224)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:204)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
    ... 22 more
Caused by: java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920
    at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
    at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
    at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 35 more

> Allow receiving updates of CommittableSummary
> ---------------------------------------------
>
>                 Key: FLINK-25920
>                 URL: https://issues.apache.org/jira/browse/FLINK-25920
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Connectors / Common
>    Affects Versions: 1.15.0, 1.16.0
>            Reporter: Fabian Paul
>            Priority: Major
>
> In the case of unaligned checkpoints, it might happen that the checkpoint barrier overtakes the records and an empty committable summary is emitted that needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)