You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2022/07/04 16:39:00 UTC

[jira] [Commented] (FLINK-28357) Watermark issue when recovering Finished sources

    [ https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17562234#comment-17562234 ] 

Piotr Nowojski commented on FLINK-28357:
----------------------------------------

Thanks for the bug report. Indeed there was a bug where max watermark was being swallowed by this non chained map.

Just note that even with the bug fix (please see my PR), you will still not see the printed message from the short lived source:
{noformat}
System.out.println(String.format("%s: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.", DateTime.now()));
{noformat}
as after recovery this source is never started, so the code never reaches the run method. And that's fine, MAX_WATERMARK is emitted by the framework.


> Watermark issue when recovering Finished sources
> ------------------------------------------------
>
>                 Key: FLINK-28357
>                 URL: https://issues.apache.org/jira/browse/FLINK-28357
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>         Environment: This can be reproduced in an IDE with the attached sample program.
>            Reporter: James
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: WatermarkDemoMain.java, image-2022-07-01-16-18-14-768.png, longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from the map() in some situations after a recovery. The dashboard goes from showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488AAAAApJh59ltR0SkB4Wz2JhnvgAFXJ9puQAAAAESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q&X-OWA-CANARY=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.&owa=outlook.live.com&scriptVer=20220617005.11&animation=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics showing input 2 having not received a watermark from the map, saying –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting with 70 seconds, it seems that if only one checkpoint has been taken with the “Short Lived Source” in a FINISHED state since the last recovery then everything works fine and the restarted “Short Lived Source” emits its watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE watermark” message on the console meaning the run() definitely executed. However, I found that if 2 or more checkpoints are taken since the last recovery with the source in a FINISHED state then the console message does not appear and the watermark is not emitted.
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my source or Flink if I see two or more checkpoints logged in between recoveries. If zero or checkpoints are made, everything is fine – the join gets the watermark and I see my console message. You can play with the checkpointing frequency as per the code comments:
>         // Useful checkpoint interval options:
>         //    5 - see the problem after the first recovery
>         //   70 - useful to see bad behaviour kick in after a recovery or two
>         //  120 - won't see the problem as we don't have 2 checkpoints within a single recovery session
> If I merge the Triggering/Completed checkpoint messages in the log with my console output I see something like this clearly showing the “Short Lived Source” run() method is not executed after 2 checkpoints with the operators marked as FINISHED:
>  
> 2022-06-29T11:52:31.268Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE watermark.
> 2022-06-29T11:52:31.293Z: LongRunningSource emitting initial watermark=1656503551268
> 2022-06-29T11:52:41.302Z: LongRunningSource emitting loop watermark=1656503561302
> 2022-06-29T11:52:51.302Z: LongRunningSource emitting loop watermark=1656503571302
> 2022-06-29T11:53:01.303Z: LongRunningSource emitting loop watermark=1656503581303
> 2022-06-29 11:53:02.772 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 1 (type=CheckpointType\{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:53:02.870 INFO  [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 1 for job 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:53:11.303Z: LongRunningSource emitting loop watermark=1656503591303
> 2022-06-29T11:53:21.304Z: LongRunningSource emitting loop watermark=1656503601304
> 2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------
> 2022-06-29T11:53:22.405Z: LongRunningSource emitting initial watermark=1656503602405
> 2022-06-29T11:53:22.408Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE watermark.
> 2022-06-29T11:53:32.406Z: LongRunningSource emitting loop watermark=1656503612406
> 2022-06-29T11:53:42.406Z: LongRunningSource emitting loop watermark=1656503622406
> 2022-06-29 11:53:51.048 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 2 (type=CheckpointType\{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:53:51.067 INFO  [jobmanager-io-thread-4] o.a.f.r.c.CheckpointCoordinator     Completed checkpoint 2 for job 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:53:52.407Z: LongRunningSource emitting loop watermark=1656503632407
> 2022-06-29T11:54:02.407Z: LongRunningSource emitting loop watermark=1656503642407
> 2022-06-29T11:54:12.408Z: LongRunningSource emitting loop watermark=1656503652408
> 2022-06-29T11:54:22.408Z: LongRunningSource emitting loop watermark=1656503662408
> 2022-06-29T11:54:32.409Z: LongRunningSource emitting loop watermark=1656503672409
> 2022-06-29T11:54:42.409Z: LongRunningSource emitting loop watermark=1656503682409
> 2022-06-29T11:54:52.410Z: LongRunningSource emitting loop watermark=1656503692410
> 2022-06-29 11:55:01.048 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 3 (type=CheckpointType\{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:55:01.057 INFO  [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 3 for job 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:55:02.410Z: LongRunningSource emitting loop watermark=1656503702410
> 2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------
> 2022-06-29T11:55:03.445Z: LongRunningSource emitting initial watermark=1656503703444       <<<<< NO “ShortLivedEmptySource” message after recovery
> 2022-06-29T11:55:13.446Z: LongRunningSource emitting loop watermark=1656503713445
> 2022-06-29T11:55:23.446Z: LongRunningSource emitting loop watermark=1656503723446
> 2022-06-29T11:55:33.446Z: LongRunningSource emitting loop watermark=1656503733446
>  
> I have also attached a longer example with shows everything working fine after 5 recoveries, and then breaking after the 6{^}th{^}.
> I am guessing here it has something to do with the checkpointing and recovery of a FINISHED source.
> Finally, here are some ways that allows the code to work:
>  * Change the code so the “Short Lived Source” doesn’t return from run() and stays RUNNING (uncomment the Thread.sleep)
>  * As I mentioned before, if I remove the map() operator the problem in the join also goes away. (I don’t see the console output but the join is happy)
>  * Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have two checkpoints with FINISHED state per recovery.
> The fact these changes prevent the issue means I really think there’s some bug or inconsistency here – if somebody could explain I would really appreciate it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)