You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Arvid Heise (Jira)" <ji...@apache.org> on 2020/12/18 16:49:00 UTC

[jira] [Commented] (FLINK-20654) Unaligned checkpoint recovery may lead to corrupted data stream

    [ https://issues.apache.org/jira/browse/FLINK-20654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251864#comment-17251864 ] 

Arvid Heise commented on FLINK-20654:
-------------------------------------

For my investigation, I added a bunch of info statements to track which buffers are written.
https://github.com/AHeise/flink/tree/FLINK-20654

For [Parallel union, p = 5], I noticed that the issues arises only when multiple buffers of the same channel are recovered.


{noformat}
19749 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 8 bytes
19749 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=1, inputChannelIdx=3} recovered 9 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=2, inputChannelIdx=3} recovered 1 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=3, inputChannelIdx=3} recovered 14 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4096 bytes
19750 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79)/InputChannelInfo{gateIdx=0, inputChannelIdx=3} recovered 4 bytes
19750 [Flat Map (4/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=1, inputChannelIdx=3} prepareSnapshot 9 bytes
19750 [Flat Map (4/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=2, inputChannelIdx=3} prepareSnapshot 1 bytes
19750 [Flat Map (4/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=3, inputChannelIdx=3} prepareSnapshot 14 bytes
19750 [Flat Map (1/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=0, inputChannelIdx=0} prepareSnapshot 17 bytes
19750 [Flat Map (1/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=1, inputChannelIdx=0} prepareSnapshot 13 bytes
19750 [Flat Map (1/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=2, inputChannelIdx=0} prepareSnapshot 18 bytes
19750 [Flat Map (1/5)#5] INFO  org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} prepareSnapshot 14 bytes
19751 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19751 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19751 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Channel state writer Flat Map (4/5)#5] INFO  org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Flat Map (4/5)#5 discarding 0 drained requests
19752 [Flat Map (1/5)#5] INFO  org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] - InputChannelInfo{gateIdx=3, inputChannelIdx=0} maybePersist 4096 bytes
19752 [Flat Map (4/5)#5] INFO  org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Flat Map (4/5)#5 discarding 1 drained requests
19753 [Source: source1 (1/5)#5] INFO  org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase [] - Snapshotted LongSplit{increment=5, nextNumber=21525, numCompletedCheckpoints=4} @ 0 subtask (5 attempt)
19753 [Source: source1 (4/5)#5] INFO  org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase [] - Snapshotted LongSplit{increment=5, nextNumber=21528, numCompletedCheckpoints=4} @ 3 subtask (5 attempt)
19753 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=0, inputChannelIdx=2} recovered 2 bytes
19753 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=1, inputChannelIdx=2} recovered 12 bytes
19753 [AsyncOperations-thread-1] INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map (4/5)#5 - asynchronous part of checkpoint 11 could not be completed.
java.util.concurrent.CancellationException: null
	at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_222]
	at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_222]
	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:583) ~[classes/:?]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:59) ~[classes/:?]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:115) [classes/:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
19753 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=2, inputChannelIdx=2} recovered 4 bytes
19753 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=3, inputChannelIdx=2} recovered 14 bytes
19753 [channel-state-unspilling-thread-1] INFO  org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel [] - Flat Map (3/5)#5 (0bf62fa35c5a671073338ea1f1730953)/InputChannelInfo{gateIdx=2, inputChannelIdx=2} recovered 4096 bytes
19753 [Flat Map (4/5)#5] WARN  org.apache.flink.runtime.taskmanager.Task [] - Flat Map (4/5)#5 (de5cfe0d40797af545b28a5c2994ca79) switched from RUNNING to FAILED.
java.lang.ArithmeticException: integer overflow
	at java.lang.Math.toIntExact(Math.java:1011) ~[?:1.8.0_222]
	at java.lang.StrictMath.toIntExact(StrictMath.java:813) ~[?:1.8.0_222]
	at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase$CountingMapFunction.flatMap(UnalignedCheckpointITCase.java:414) ~[test-classes/:?]
	at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase$CountingMapFunction.flatMap(UnalignedCheckpointITCase.java:401) ~[test-classes/:?]
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) ~[classes/:?]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) ~[classes/:?]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:185) ~[classes/:?]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) ~[classes/:?]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[classes/:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[classes/:?]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[classes/:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[classes/:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[classes/:?]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [classes/:?]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [classes/:?]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
{noformat}


> Unaligned checkpoint recovery may lead to corrupted data stream
> ---------------------------------------------------------------
>
>                 Key: FLINK-20654
>                 URL: https://issues.apache.org/jira/browse/FLINK-20654
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Arvid Heise
>            Priority: Major
>             Fix For: 1.13.0, 1.12.1
>
>
> Fix of FLINK-20433 shows potential corruption after recovery for all variations of UnalignedCheckpointITCase.
> To reproduce, run UCITCase a couple hundreds times. The issue showed for me in:
> - execute [Parallel union, p = 5]
> - execute [Parallel union, p = 10]
> - execute [Parallel cogroup, p = 5]
> - execute [parallel pipeline with remote channels, p = 5]
> with decreasing frequency.
> The issue manifests as one of the following issues:
> - stream corrupted exception
> - EOF exception
> - assertion failure in NUM_LOST or NUM_OUT_OF_ORDER
> - (for union) ArithmeticException overflow (because the number that should be [0;100000] has been mis-deserialized)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)