You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martijn Visser <ma...@2symbols.com> on 2022/07/01 11:11:42 UTC

Re: Watermark issue with env.fromCollection() source and map()

Hi James,

Can you create a FLINK ticket for your finding? We can then see if one of
the maintainers could have a look on this issue.

Best regards,

Martijn

Op wo 29 jun. 2022 om 16:21 schreef James Sandys-Lumsdaine <
jas_sl@hotmail.com>:

> Hi,
>
>
>
> I done a lot more experimentation and I’m convinced there is a problem
> with Flink handling Finished sources and recovery. Please read through the
> below and consider running the attached example which I explain below.
>
>
>
> The program consists of:
>
> ·         Two sources:
>
> o   One “Long Running Source” – stays alive and emits a watermark of
> DateTime.now() every 10 seconds.
>
> §  Prints the console a message saying the watermark has been emitted.
>
> §  *Throws an exception every 5 or 10 iterations to force a recovery.*
>
> o   One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a
> message to the console and returns.
>
> ·         The “Short Live Source” feeds into a map() and then it joins
> with the “Long Running Source” with a KeyedCoProcessFunction. Moves to
> “FINISHED” state by Flink.
>
>
>
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark
> from the map() in some situations after a recovery. The dashboard goes from
> showing this:
>
> To the below after a recovery (with the currentInput1/2Watermark metrics
> showing input 2 having not received a watermark from the map, saying
> –Long.MAX_VALUE):
>
>
>
> The program is currently set to checkpoint every 5 seconds. By
> experimenting with 70 seconds, it seems that if only one checkpoint has
> been taken with the “Short Lived Source” in a FINISHED state since the last
> recovery then everything works fine and the restarted “Short Lived Source”
> emits its watermark and I see the “ShortedLivedEmptySource emitting
> Long.MAX_VALUE watermark” message on the console meaning the run()
> definitely executed. However, I found that if 2 or more checkpoints are
> taken since the last recovery with the source in a FINISHED state then the
> console message does not appear and the watermark is not emitted.
>
>
>
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my
> source or Flink if I see two or more checkpoints logged in between
> recoveries. If zero or checkpoints are made, everything is fine – the join
> gets the watermark and I see my console message. You can play with the
> checkpointing frequency as per the code comments:
>
>         // Useful checkpoint interval options:
>
>         //    5 - see the problem after the first recovery
>
>         //   70 - useful to see bad behaviour kick in after a recovery or
> two
>
>         //  120 - won't see the problem as we don't have 2 checkpoints
> within a single recovery session
>
>
>
> If I merge the Triggering/Completed checkpoint messages in the log with my
> console output I see something like this clearly showing the “Short Lived
> Source” run() method is not executed after 2 checkpoints with the operators
> marked as FINISHED:
>
>
>
> 2022-06-29T11:52:31.268Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE
> watermark.
>
> 2022-06-29T11:52:31.293Z: LongRunningSource emitting initial
> watermark=1656503551268
>
> 2022-06-29T11:52:41.302Z: LongRunningSource emitting loop
> watermark=1656503561302
>
> 2022-06-29T11:52:51.302Z: LongRunningSource emitting loop
> watermark=1656503571302
>
> 2022-06-29T11:53:01.303Z: LongRunningSource emitting loop
> watermark=1656503581303
>
> 2022-06-29 11:53:02.772 INFO  [Checkpoint Timer]
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 1
> (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD})
>
> 2022-06-29 11:53:02.870 INFO  [jobmanager-io-thread-10]
> o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 1 for job
> 877656d7752bc1304c2cb92790e6aefb
>
> 2022-06-29T11:53:11.303Z: LongRunningSource emitting loop
> watermark=1656503591303
>
> 2022-06-29T11:53:21.304Z: LongRunningSource emitting loop
> watermark=1656503601304
>
> 2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------
>
> 2022-06-29T11:53:22.405Z: LongRunningSource emitting initial
> watermark=1656503602405
>
> 2022-06-29T11:53:22.408Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE
> watermark.
>
> 2022-06-29T11:53:32.406Z: LongRunningSource emitting loop
> watermark=1656503612406
>
> 2022-06-29T11:53:42.406Z: LongRunningSource emitting loop
> watermark=1656503622406
>
> 2022-06-29 11:53:51.048 INFO  [Checkpoint Timer]
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 2
> (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD})
>
> 2022-06-29 11:53:51.067 INFO  [jobmanager-io-thread-4]
> o.a.f.r.c.CheckpointCoordinator     Completed checkpoint 2 for job
> 877656d7752bc1304c2cb92790e6aefb
>
> 2022-06-29T11:53:52.407Z: LongRunningSource emitting loop
> watermark=1656503632407
>
> 2022-06-29T11:54:02.407Z: LongRunningSource emitting loop
> watermark=1656503642407
>
> 2022-06-29T11:54:12.408Z: LongRunningSource emitting loop
> watermark=1656503652408
>
> 2022-06-29T11:54:22.408Z: LongRunningSource emitting loop
> watermark=1656503662408
>
> 2022-06-29T11:54:32.409Z: LongRunningSource emitting loop
> watermark=1656503672409
>
> 2022-06-29T11:54:42.409Z: LongRunningSource emitting loop
> watermark=1656503682409
>
> 2022-06-29T11:54:52.410Z: LongRunningSource emitting loop
> watermark=1656503692410
>
> 2022-06-29 11:55:01.048 INFO  [Checkpoint Timer]
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 3
> (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD})
>
> 2022-06-29 11:55:01.057 INFO  [jobmanager-io-thread-10]
> o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 3 for job
> 877656d7752bc1304c2cb92790e6aefb
>
> 2022-06-29T11:55:02.410Z: LongRunningSource emitting loop
> watermark=1656503702410
>
> 2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------
>
> 2022-06-29T11:55:03.445Z: LongRunningSource emitting initial
> watermark=1656503703444       <<<<< NO “ShortLivedEmptySource” message
> after recovery
>
> 2022-06-29T11:55:13.446Z: LongRunningSource emitting loop
> watermark=1656503713445
>
> 2022-06-29T11:55:23.446Z: LongRunningSource emitting loop
> watermark=1656503723446
>
> 2022-06-29T11:55:33.446Z: LongRunningSource emitting loop
> watermark=1656503733446
>
>
>
> I have also attached a longer example with shows everything working fine
> after 5 recoveries, and then breaking after the 6th.
>
>
>
> I am guessing here it has something to do with the checkpointing and
> recovery of a FINISHED source.
>
>
>
> Finally, here are some ways that allows the code to work:
>
> ·         Change the code so the “Short Lived Source” doesn’t return from
> run() and stays RUNNING (uncomment the Thread.sleep)
>
> ·         As I mentioned before, if I remove the map() operator the
> problem in the join also goes away. (I don’t see the console output but the
> join is happy)
>
> ·         Use a long enough checkpoint interval (e.g. 120 seconds) so we
> don’t have two checkpoints with FINISHED state per recovery.
>
>
>
> The fact these changes prevent the issue means I really think there’s some
> bug or inconsistency here – if somebody could explain I would really
> appreciate it.
>
>
>
> Thanks,
>
>
>
> James.
>
>
>
> *From:* Sandys-Lumsdaine, James
> *Sent:* 28 June 2022 11:15
> *To:* James Sandys-Lumsdaine <ja...@hotmail.com>
> *Subject:* RE: Watermark issue with env.fromCollection() source and map()
>
>
>
> Thanks for your reply. Yes, understood about the recovery not restoring or
> recovery watermarks.
>
>
>
> So in my example:
>
> ·         My env.fromElements() “source” emits no elements and is
> immediately marked as FINISHED by Flink which, in my understanding, emits a
> Long.MAX_VALUE which means nothing downstream is going to be blocked. This
> seems to happen after startup, before my recovery situation.
>
> ·         However, after a recovery the dashboard shows this node RUNNING
> again so I assume it would just follow the same pattern – when it shuts
> down again the Long.MAX_VALUE watermark will be emitted but I think you’re
> saying it won’t be so this is a little strange. Therefore downstream nodes
> get blocked.
>
>
>
> My assumption (from documentation I’ve read) was that Flink automatically
> sends the Long.MAX_VALUE for node that finishes (and reaches a FINISHED
> state) and I assumed this covered a recovery too. If not I will, like you
> suggest, create custom sources that remember their last watermark and
> re-emit them after a recovery. This also means env.fromElements() can’t be
> used for a system that expects to handle recovery – am I right? It seems
> more like a utility method for testing. In fact, **any** sources that
> finish must re-emit a Long.MAX_VALUE watermark on recovery otherwise I will
> have the same problem.
>
>
>
> On a wider question… surely if one of the inputs is FINISHED it is
> discounted by a CoProcessFunction join? Or is this not the case and by
> design must receive watermarks for both inputs and therefore rely on
> sources re-emitting them?
>
>
>
> Finally – what has confused me is the different behaviour in my example
> with and without the map() operation. Can you explain why removing the
> map() operation allowed my final operator to get the high watermark?
>
>
>
> Thanks,
>
>
>
> James.
>
>
>
>
>
> *From:* Schwalbe Matthias <Ma...@viseca.ch>
> *Sent:* 28 June 2022 09:17
> *To:* James Sandys-Lumsdaine <ja...@hotmail.com>; user@flink.apache.org
> *Subject:* RE: Watermark issue with env.fromCollection() source and map()
>
>
>
> *CAUTION: External email. The email originated outside of our company *
>
> Hi James,
>
>
>
> What you describe is completely expected
>
>
>
> One simple thing that might not be obvious: watermarks are neither stored
> to nor recovered from checkpoints/savepoints.
>
> The idea is, that in a lively system they’ll get generated shortly after
> restart, triggered by events regularly produced.
>
> In you case there are no events after recovery.
>
>
>
> In a situation where you have slow streams that block watermark progress
> after recovery, you could code a operator that explicitly stores watermarks
> per sub-partition (, takes care of rearranged subtasks), and emits restored
> watermarks into the slow stream.
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* James Sandys-Lumsdaine <ja...@hotmail.com>
> *Sent:* Monday, June 27, 2022 5:12 PM
> *To:* user@flink.apache.org
> *Subject:* Watermark issue with env.fromCollection() source and map()
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I am running a Flink 1.15.0 DataStream workflow and have come across some
> weird behaviour after a recovery with watermarking that might be a bug. I
> managed to reproduce the issue in a very simply program attached.
>
>
>
> The program has 2 data sources – the first is a never-ending “Running”
> source that emits a Long.MAX_VALUE as soon as it starts, and the other is a
> source created from env.fromCollection() that has no elements and is
> expected to reach a Finished state immediately. These streams are fed into
> a KeyedCoProcessFunction and I used the Flink dashboard to monitor the
> watermark each node has reached.
>
>
>
> In the “Running” source, after a short while I force a recovery by
> throwing an exception. What I expect is for the “Join” node to reach the
> same Long.MAX_VALUE after recovery.
>
>
>
> With the above setup this works fine – however, if I add in a simple map()
> on the “empty” stream created with env.fromCollection() and then feed that
> into the join it breaks the flow of the watermark for some reason.
>
>
>
> So before the recovery the Flink dashboard shows this – notice how the
> “Join” is showing the expected watermark and the drill down on that node
> shows the Long.MAX_VALUE from both input1 and input2:
>
>
>
>
>
> Then after recovery the dashboard shows this instead – notice how the
> “Join” is no longer showing a watermark and the drill down is showing that
> input2 (from the map() node) is the input preventing a high watermark:
>
>
>
> What is weird is that the dashboard clearly shows the watermark of the map
> node is Long.MAX_VALUE but this isn’t being propagated to the Join.
>
>
>
> Can anyone shed any light on this? I assume I am able to create simple
> data sources with env.fromCollection() and have Flink automatically
> propagate the watermark?
>
>
>
> Many thanks in advance,
>
>
>
> James.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> The information transmitted is intended only for the person or entity to
> which it is addressed and may contain confidential and/or privileged
> material. Any review, retransmission, dissemination or other use of, or
> taking of any action in reliance upon, this information by persons or
> entities other than the intended recipient is prohibited. If you received
> this in error, please contact the sender and delete the material from any
> computer.
>
> This communication is for informational purposes only. It is not intended
> as an offer or solicitation for the purchase or sale of any financial
> instrument or as an official confirmation of any transaction. Any market
> prices, data and other information are not warranted as to completeness or
> accuracy and are subject to change without notice. Any comments or
> statements made herein do not necessarily reflect those of Systematica
> Investments UK LLP, its parents, subsidiaries or affiliates.
>
> Systematica Investments UK LLP (“SIUK”), which is authorised and regulated
> by the Financial Conduct Authority of the United Kingdom (the “FCA”) is
> authorised and regulated by the Financial Conduct Authority and is
> registered with the U.S. Securities and Exchange Commission as an
> investment adviser under the Investment Advisers Act of 1940.
>
> Systematica Investments UK LLP is registered in England and Wales with a
> partnership number OC424197. Registered Office: Equitable House, 47 King
> William Street, London EC4R 9AF.
>
> Recipients of this communication should note that electronic
> communication, whether by email, website, SWIFT or otherwise, is an unsafe
> method of communication. Emails and SWIFT messages may be lost, delivered
> to the wrong address, intercepted or affected by delays, interference by
> third parties or viruses and their confidentiality, security and integrity
> cannot be guaranteed. None of SIGPL or any of its affiliates bear any
> liability or responsibility therefor.
>
> Please see the important information at www.systematica.com/disclaimer.
> <http://www.systematica.com/disclaimer>
>
> Please see the important information, including regarding the processing
> of personal data by Systematica, at www.systematica.com/PrivacyNotice.
>
> www.systematica.com
>

Re: Watermark issue with env.fromCollection() source and map()

Posted by James Sandys-Lumsdaine <ja...@hotmail.com>.
Thanks for the suggestion.
Raised: https://issues.apache.org/jira/browse/FLINK-28357

James.
________________________________
From: Martijn Visser <ma...@2symbols.com>
Sent: 01 July 2022 12:11
To: James Sandys-Lumsdaine <ja...@hotmail.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Watermark issue with env.fromCollection() source and map()

Hi James,

Can you create a FLINK ticket for your finding? We can then see if one of the maintainers could have a look on this issue.

Best regards,

Martijn

Op wo 29 jun. 2022 om 16:21 schreef James Sandys-Lumsdaine <ja...@hotmail.com>>:
Hi,



I done a lot more experimentation and I’m convinced there is a problem with Flink handling Finished sources and recovery. Please read through the below and consider running the attached example which I explain below.



The program consists of:

•         Two sources:

o   One “Long Running Source” – stays alive and emits a watermark of DateTime.now() every 10 seconds.

•  Prints the console a message saying the watermark has been emitted.

•  Throws an exception every 5 or 10 iterations to force a recovery.

o   One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a message to the console and returns.

•         The “Short Live Source” feeds into a map() and then it joins with the “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” state by Flink.



The problem here is that the “Join” receives no Long.MAX_VALUE watermark from the map() in some situations after a recovery. The dashboard goes from showing this:

[cid:181b97526bf855d351]

To the below after a recovery (with the currentInput1/2Watermark metrics showing input 2 having not received a watermark from the map, saying –Long.MAX_VALUE):

[cid:181b97526c29374b66]



The program is currently set to checkpoint every 5 seconds. By experimenting with 70 seconds, it seems that if only one checkpoint has been taken with the “Short Lived Source” in a FINISHED state since the last recovery then everything works fine and the restarted “Short Lived Source” emits its watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE watermark” message on the console meaning the run() definitely executed. However, I found that if 2 or more checkpoints are taken since the last recovery with the source in a FINISHED state then the console message does not appear and the watermark is not emitted.



To repeat – the Join does not get a Long.MAX_VALUE watermark from my source or Flink if I see two or more checkpoints logged in between recoveries. If zero or checkpoints are made, everything is fine – the join gets the watermark and I see my console message. You can play with the checkpointing frequency as per the code comments:

        // Useful checkpoint interval options:

        //    5 - see the problem after the first recovery

        //   70 - useful to see bad behaviour kick in after a recovery or two

        //  120 - won't see the problem as we don't have 2 checkpoints within a single recovery session



If I merge the Triggering/Completed checkpoint messages in the log with my console output I see something like this clearly showing the “Short Lived Source” run() method is not executed after 2 checkpoints with the operators marked as FINISHED:



2022-06-29T11:52:31.268Z: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.

2022-06-29T11:52:31.293Z: LongRunningSource emitting initial watermark=1656503551268

2022-06-29T11:52:41.302Z: LongRunningSource emitting loop watermark=1656503561302

2022-06-29T11:52:51.302Z: LongRunningSource emitting loop watermark=1656503571302

2022-06-29T11:53:01.303Z: LongRunningSource emitting loop watermark=1656503581303

2022-06-29 11:53:02.772 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})

2022-06-29 11:53:02.870 INFO  [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 1 for job 877656d7752bc1304c2cb92790e6aefb

2022-06-29T11:53:11.303Z: LongRunningSource emitting loop watermark=1656503591303

2022-06-29T11:53:21.304Z: LongRunningSource emitting loop watermark=1656503601304

2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------

2022-06-29T11:53:22.405Z: LongRunningSource emitting initial watermark=1656503602405

2022-06-29T11:53:22.408Z: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.

2022-06-29T11:53:32.406Z: LongRunningSource emitting loop watermark=1656503612406

2022-06-29T11:53:42.406Z: LongRunningSource emitting loop watermark=1656503622406

2022-06-29 11:53:51.048 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})

2022-06-29 11:53:51.067 INFO  [jobmanager-io-thread-4] o.a.f.r.c.CheckpointCoordinator     Completed checkpoint 2 for job 877656d7752bc1304c2cb92790e6aefb

2022-06-29T11:53:52.407Z: LongRunningSource emitting loop watermark=1656503632407

2022-06-29T11:54:02.407Z: LongRunningSource emitting loop watermark=1656503642407

2022-06-29T11:54:12.408Z: LongRunningSource emitting loop watermark=1656503652408

2022-06-29T11:54:22.408Z: LongRunningSource emitting loop watermark=1656503662408

2022-06-29T11:54:32.409Z: LongRunningSource emitting loop watermark=1656503672409

2022-06-29T11:54:42.409Z: LongRunningSource emitting loop watermark=1656503682409

2022-06-29T11:54:52.410Z: LongRunningSource emitting loop watermark=1656503692410

2022-06-29 11:55:01.048 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 3 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})

2022-06-29 11:55:01.057 INFO  [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 3 for job 877656d7752bc1304c2cb92790e6aefb

2022-06-29T11:55:02.410Z: LongRunningSource emitting loop watermark=1656503702410

2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------

2022-06-29T11:55:03.445Z: LongRunningSource emitting initial watermark=1656503703444       <<<<< NO “ShortLivedEmptySource” message after recovery

2022-06-29T11:55:13.446Z: LongRunningSource emitting loop watermark=1656503713445

2022-06-29T11:55:23.446Z: LongRunningSource emitting loop watermark=1656503723446

2022-06-29T11:55:33.446Z: LongRunningSource emitting loop watermark=1656503733446



I have also attached a longer example with shows everything working fine after 5 recoveries, and then breaking after the 6th.



I am guessing here it has something to do with the checkpointing and recovery of a FINISHED source.



Finally, here are some ways that allows the code to work:

•         Change the code so the “Short Lived Source” doesn’t return from run() and stays RUNNING (uncomment the Thread.sleep)

•         As I mentioned before, if I remove the map() operator the problem in the join also goes away. (I don’t see the console output but the join is happy)

•         Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have two checkpoints with FINISHED state per recovery.



The fact these changes prevent the issue means I really think there’s some bug or inconsistency here – if somebody could explain I would really appreciate it.



Thanks,



James.



From: Sandys-Lumsdaine, James
Sent: 28 June 2022 11:15
To: James Sandys-Lumsdaine <ja...@hotmail.com>>
Subject: RE: Watermark issue with env.fromCollection() source and map()



Thanks for your reply. Yes, understood about the recovery not restoring or recovery watermarks.



So in my example:

•         My env.fromElements() “source” emits no elements and is immediately marked as FINISHED by Flink which, in my understanding, emits a Long.MAX_VALUE which means nothing downstream is going to be blocked. This seems to happen after startup, before my recovery situation.

•         However, after a recovery the dashboard shows this node RUNNING again so I assume it would just follow the same pattern – when it shuts down again the Long.MAX_VALUE watermark will be emitted but I think you’re saying it won’t be so this is a little strange. Therefore downstream nodes get blocked.



My assumption (from documentation I’ve read) was that Flink automatically sends the Long.MAX_VALUE for node that finishes (and reaches a FINISHED state) and I assumed this covered a recovery too. If not I will, like you suggest, create custom sources that remember their last watermark and re-emit them after a recovery. This also means env.fromElements() can’t be used for a system that expects to handle recovery – am I right? It seems more like a utility method for testing. In fact, *any* sources that finish must re-emit a Long.MAX_VALUE watermark on recovery otherwise I will have the same problem.



On a wider question… surely if one of the inputs is FINISHED it is discounted by a CoProcessFunction join? Or is this not the case and by design must receive watermarks for both inputs and therefore rely on sources re-emitting them?



Finally – what has confused me is the different behaviour in my example with and without the map() operation. Can you explain why removing the map() operation allowed my final operator to get the high watermark?



Thanks,



James.





From: Schwalbe Matthias <Ma...@viseca.ch>>
Sent: 28 June 2022 09:17
To: James Sandys-Lumsdaine <ja...@hotmail.com>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: RE: Watermark issue with env.fromCollection() source and map()



CAUTION: External email. The email originated outside of our company

Hi James,



What you describe is completely expected



One simple thing that might not be obvious: watermarks are neither stored to nor recovered from checkpoints/savepoints.

The idea is, that in a lively system they’ll get generated shortly after restart, triggered by events regularly produced.

In you case there are no events after recovery.



In a situation where you have slow streams that block watermark progress after recovery, you could code a operator that explicitly stores watermarks per sub-partition (, takes care of rearranged subtasks), and emits restored watermarks into the slow stream.





Thias





From: James Sandys-Lumsdaine <ja...@hotmail.com>>
Sent: Monday, June 27, 2022 5:12 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Watermark issue with env.fromCollection() source and map()



⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠



Hi,



I am running a Flink 1.15.0 DataStream workflow and have come across some weird behaviour after a recovery with watermarking that might be a bug. I managed to reproduce the issue in a very simply program attached.



The program has 2 data sources – the first is a never-ending “Running” source that emits a Long.MAX_VALUE as soon as it starts, and the other is a source created from env.fromCollection() that has no elements and is expected to reach a Finished state immediately. These streams are fed into a KeyedCoProcessFunction and I used the Flink dashboard to monitor the watermark each node has reached.



In the “Running” source, after a short while I force a recovery by throwing an exception. What I expect is for the “Join” node to reach the same Long.MAX_VALUE after recovery.



With the above setup this works fine – however, if I add in a simple map() on the “empty” stream created with env.fromCollection() and then feed that into the join it breaks the flow of the watermark for some reason.



So before the recovery the Flink dashboard shows this – notice how the “Join” is showing the expected watermark and the drill down on that node shows the Long.MAX_VALUE from both input1 and input2:

[cid:181b97526c24cff312]

[cid:181b97526c25b16b23]





Then after recovery the dashboard shows this instead – notice how the “Join” is no longer showing a watermark and the drill down is showing that input2 (from the map() node) is the input preventing a high watermark:

[cid:181b97526c2692e334]

[cid:181b97526c27745b45]



What is weird is that the dashboard clearly shows the watermark of the map node is Long.MAX_VALUE but this isn’t being propagated to the Join.



Can anyone shed any light on this? I assume I am able to create simple data sources with env.fromCollection() and have Flink automatically propagate the watermark?



Many thanks in advance,



James.



Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer.

This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. Any market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of Systematica Investments UK LLP, its parents, subsidiaries or affiliates.

Systematica Investments UK LLP (“SIUK”), which is authorised and regulated by the Financial Conduct Authority of the United Kingdom (the “FCA”) is authorised and regulated by the Financial Conduct Authority and is registered with the U.S. Securities and Exchange Commission as an investment adviser under the Investment Advisers Act of 1940.

Systematica Investments UK LLP is registered in England and Wales with a partnership number OC424197. Registered Office: Equitable House, 47 King William Street, London EC4R 9AF.

Recipients of this communication should note that electronic communication, whether by email, website, SWIFT or otherwise, is an unsafe method of communication. Emails and SWIFT messages may be lost, delivered to the wrong address, intercepted or affected by delays, interference by third parties or viruses and their confidentiality, security and integrity cannot be guaranteed. None of SIGPL or any of its affiliates bear any liability or responsibility therefor.

Please see the important information at www.systematica.com/disclaimer.<http://www.systematica.com/disclaimer>

Please see the important information, including regarding the processing of personal data by Systematica, at www.systematica.com/PrivacyNotice<http://www.systematica.com/PrivacyNotice>.

www.systematica.com<http://www.systematica.com/>