You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sweta Kalakuntla <sk...@bandwidth.com> on 2022/07/05 12:14:52 UTC

Re: Recover watermark from savepoint

Hi Thias,

Thank you for providing a detailed explanation. We did something similar.

The job is set to 0 late events, aggregates every 20 min and sends out the
value. So we are saving the last processed window per key in the state.
During aggregation, if current window > last window, then processes else
discard. This will be applied even during savepoint recovery and discards
any aggregations due to late events.

Thank you,
Sweta




On Fri, Jun 10, 2022 at 2:38 AM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi Sweta,
>
>
>
> It is actually a sound idea to implement a dedicated process function for
> this purpose, as David suggests.
>
> Especially if you are in a situation where waiting for a valid natural
> watermark after a restore from savepoint is not sufficient.
>
>
>
> We had a situation with input streams of different update frequencies (one
> only updated once a day, and hence only generated watermarks once a day).
>
>
>
> This is how you can approach the specific task of
>
>    - watermark storing:
>       - Create a process function
>       - Create a map that stores the latest watermark per sub-partition
>       (i.e. there are 128 sub-partitions in a job with max-parallelism of 128)
>       - Store this map into operator state with each checkpoint
>       - Create a repeating processing time timer (with high frequency
>       according to your needs), in order to yield a watermark after savepoint
>       restore
>    - watermark restoring:
>       - when restoring from operator state (because there might have been
>       a change in parallelism):
>       - determine the lowest watermark among all sub-partition that
>       belong to the respective subtask (on operator state restore)
>       - yield this watermark in processing time handler of above timer
>       (once)
>
>
>
> Feel free to ask details, I hope this helps … I need to ask my folks
> whether I can share our implementation (20 lines of code, odd).
>
>
>
> What do you think?
>
>
>
> Thias
>
>
>
>
>
> *From:* David Anderson <da...@apache.org>
> *Sent:* Thursday, June 9, 2022 11:35 AM
> *To:* User-Flink <us...@flink.apache.org>
> *Subject:* Re: Recover watermark from savepoint
>
>
>
> Sweta,
>
>
>
> Flink does not include watermarks in savepoints, nor are they included in
> aligned checkpoints. For what it's worth, I believe that with unaligned
> checkpoints in-flight watermarks are included in checkpoints, but I don't
> believe that would solve the problem, since the watermark strategy's state
> is still lost during a restart.
>
>
>
> I can't think of any way to guarantee that all possibly late events will
> be deterministically identified as late. The commonly used
> bounded-out-of-orderness watermark strategy doesn't guarantee this either,
> even without a restart (because watermarks are delayed by the auto
> watermark interval, rather than being produced at every conceivable
> opportunity).
>
>
>
> If this is a strong requirement, you could decide not to rely on
> watermarks for dropping late events, and implement the logic yourself in a
> process function.
>
>
>
> Best,
>
> David
>
>
>
> On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla <sk...@bandwidth.com>
> wrote:
>
> Hi,
>
>
>
> I want to understand if flink saves a watermark during savepoint and if
> not, how do we achieve this?
>
>
>
> We are seeing an issue where on recovery, the job processes some late
> events which should have been discarded if the job were to be running
> without any downtime.
>
>
>
> Thank you,
>
> Sweta
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>