You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2021/02/01 14:37:33 UTC

Re: Proctime consistency

Hi Rex,

processing-time gives you no alignment of operators across nodes. Each 
operation works with its local machine clock that might be interrupted 
by the OS, Java garbage collector, etc. It is always a best effort timing.

Regards,
Timo


On 27.01.21 18:16, Rex Fenley wrote:
> Hello,
> 
> I'm looking at ways to deduplicate data and found [1], but does proctime 
> get committed with operators? How does this work against clock skew on 
> different machines?
> 
> Thanks
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
> 
> -- 
> 
> Rex Fenley|Software Engineer - Mobile and Backend
> 
> 
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | 
> FOLLOW US <https://twitter.com/remindhq> | LIKE US 
> <https://www.facebook.com/remindhq>
> 


Re: Proctime consistency

Posted by Timo Walther <tw...@apache.org>.
Hi Rex,

sorry for replying so late. Yes, your summary should be correct.

In many cases this processing time stress on restore is the reason why 
people select event time eventually. But if that is fine for your use 
case, that's great.

Regards,
Timo

On 05.02.21 06:26, Rex Fenley wrote:
> So if I'm reading this correctly, on checkpoint restore, if current 
> machine time / proc time > checkpointed window proc time, the window 
> will fire immediately with all the data it had aggregated. If current 
> machine time < window proc time, the window will just continue where it 
> left off until it hits the machine's clock time where it is meant to 
> trigger.
> 
> That actually also seems perfectly fine for our use case. I see the 
> concern if there are a lot of proc time windows building up how a lot of 
> triggers firing could stress resources on a restore, but I don't think 
> that will matter for our case, we just want to make sure we don't lose 
> any data or have any gaps between windows.
> 
> Please confirm if I got this right, and thank you much for your reply!
> 
> On Tue, Feb 2, 2021 at 3:17 AM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     As far as I know, we support ROW_NUMBER in SQL that could give you
>     sequence number.
> 
>     Regarding window semantics, the processing time only determines when to
>     trigger the evaluation (also mentioned here [1]). A timer is registered
>     for the next evaluation. The window content and next timer is part of
>     every checkpoint and savepoint. If you restore from a
>     checkpoint/savepoint, the stored next timestamp will be checked with
>     the
>     current wall clock and an evaluation might be triggered immediately.
>     Thus, usually event-time is more useful than processing time. If you
>     have a lot of processing time timers set, they might all fire
>     immediately during a restore.
> 
>     So the window will not start over from scratch. But inflight data that
>     was about to reach the window operator will be reread from the source
>     operator.
> 
>     Timo
> 
> 
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers
>     <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers>
> 
> 
>     On 01.02.21 20:06, Rex Fenley wrote:
>      > We need to aggregate in precisely row order. Is there a safe way
>     to do
>      > this? Maybe with some sort of row time sequence number?
>      >
>      > As written in another email, we're currently doing the following
>     set of
>      > operations
>      > valcompactedUserDocsStream = userDocsStream
>      > .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>      > .aggregate(newCompactionAggregate())
>      >
>      > I guess my concern is if we restore from a checkpoint or savepoint I
>      > don't understand how the window get's checkpointed and how window
>      > alignment works between runs of a job. Will the window just start
>     over
>      > from scratch, and re-process any rows that may have been inflight
>     but
>      > not finished processing in the previous run's last window?
>      >
>      > If so then I guess everything will arrive in row order like we
>     want it
>      > to. But if a window get's checkpointed with its previous
>     proctime, then
>      > it may be misaligned in the next run and drop rows that were in
>     that window.
>      >
>      > On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <twalthr@apache.org
>     <ma...@apache.org>
>      > <mailto:twalthr@apache.org <ma...@apache.org>>> wrote:
>      >
>      >     Hi Rex,
>      >
>      >     processing-time gives you no alignment of operators across
>     nodes. Each
>      >     operation works with its local machine clock that might be
>     interrupted
>      >     by the OS, Java garbage collector, etc. It is always a best
>     effort
>      >     timing.
>      >
>      >     Regards,
>      >     Timo
>      >
>      >
>      >     On 27.01.21 18:16, Rex Fenley wrote:
>      >      > Hello,
>      >      >
>      >      > I'm looking at ways to deduplicate data and found [1], but
>     does
>      >     proctime
>      >      > get committed with operators? How does this work against clock
>      >     skew on
>      >      > different machines?
>      >      >
>      >      > Thanks
>      >      >
>      >      > [1]
>      >      >
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>      >   
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>
>      >
>      >      >
>      >   
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>      >   
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>>
>      >      >
>      >      > --
>      >      >
>      >      > Rex Fenley|Software Engineer - Mobile and Backend
>      >      >
>      >      >
>      >      > Remind.com <https://www.remind.com/
>     <https://www.remind.com/> <https://www.remind.com/
>     <https://www.remind.com/>>>|
>      >     BLOG <http://blog.remind.com/ <http://blog.remind.com/>
>     <http://blog.remind.com/ <http://blog.remind.com/>>> |
>      >      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>
>      >     <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>>> | LIKE US
>      >      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>
>      >     <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>>
>      >      >
>      >
>      >
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
>      >
>      >
> 
> 
> 
> -- 
> 
> Rex Fenley|Software Engineer - Mobile and Backend
> 
> 
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | 
> FOLLOW US <https://twitter.com/remindhq> | LIKE US 
> <https://www.facebook.com/remindhq>
> 


Re: Proctime consistency

Posted by Rex Fenley <Re...@remind101.com>.
So if I'm reading this correctly, on checkpoint restore, if current machine
time / proc time > checkpointed window proc time, the window will fire
immediately with all the data it had aggregated. If current machine time <
window proc time, the window will just continue where it left off until it
hits the machine's clock time where it is meant to trigger.

That actually also seems perfectly fine for our use case. I see the concern
if there are a lot of proc time windows building up how a lot of triggers
firing could stress resources on a restore, but I don't think that will
matter for our case, we just want to make sure we don't lose any data or
have any gaps between windows.

Please confirm if I got this right, and thank you much for your reply!

On Tue, Feb 2, 2021 at 3:17 AM Timo Walther <tw...@apache.org> wrote:

> As far as I know, we support ROW_NUMBER in SQL that could give you
> sequence number.
>
> Regarding window semantics, the processing time only determines when to
> trigger the evaluation (also mentioned here [1]). A timer is registered
> for the next evaluation. The window content and next timer is part of
> every checkpoint and savepoint. If you restore from a
> checkpoint/savepoint, the stored next timestamp will be checked with the
> current wall clock and an evaluation might be triggered immediately.
> Thus, usually event-time is more useful than processing time. If you
> have a lot of processing time timers set, they might all fire
> immediately during a restore.
>
> So the window will not start over from scratch. But inflight data that
> was about to reach the window operator will be reread from the source
> operator.
>
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers
>
>
> On 01.02.21 20:06, Rex Fenley wrote:
> > We need to aggregate in precisely row order. Is there a safe way to do
> > this? Maybe with some sort of row time sequence number?
> >
> > As written in another email, we're currently doing the following set of
> > operations
> > valcompactedUserDocsStream = userDocsStream
> > .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> > .aggregate(newCompactionAggregate())
> >
> > I guess my concern is if we restore from a checkpoint or savepoint I
> > don't understand how the window get's checkpointed and how window
> > alignment works between runs of a job. Will the window just start over
> > from scratch, and re-process any rows that may have been inflight but
> > not finished processing in the previous run's last window?
> >
> > If so then I guess everything will arrive in row order like we want it
> > to. But if a window get's checkpointed with its previous proctime, then
> > it may be misaligned in the next run and drop rows that were in that
> window.
> >
> > On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Rex,
> >
> >     processing-time gives you no alignment of operators across nodes.
> Each
> >     operation works with its local machine clock that might be
> interrupted
> >     by the OS, Java garbage collector, etc. It is always a best effort
> >     timing.
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 27.01.21 18:16, Rex Fenley wrote:
> >      > Hello,
> >      >
> >      > I'm looking at ways to deduplicate data and found [1], but does
> >     proctime
> >      > get committed with operators? How does this work against clock
> >     skew on
> >      > different machines?
> >      >
> >      > Thanks
> >      >
> >      > [1]
> >      >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >     <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >
> >
> >      >
> >     <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >     <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >>
> >      >
> >      > --
> >      >
> >      > Rex Fenley|Software Engineer - Mobile and Backend
> >      >
> >      >
> >      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
> >     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
> >      > FOLLOW US <https://twitter.com/remindhq
> >     <https://twitter.com/remindhq>> | LIKE US
> >      > <https://www.facebook.com/remindhq
> >     <https://www.facebook.com/remindhq>>
> >      >
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
> >
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Proctime consistency

Posted by Timo Walther <tw...@apache.org>.
As far as I know, we support ROW_NUMBER in SQL that could give you 
sequence number.

Regarding window semantics, the processing time only determines when to 
trigger the evaluation (also mentioned here [1]). A timer is registered 
for the next evaluation. The window content and next timer is part of 
every checkpoint and savepoint. If you restore from a 
checkpoint/savepoint, the stored next timestamp will be checked with the 
current wall clock and an evaluation might be triggered immediately. 
Thus, usually event-time is more useful than processing time. If you 
have a lot of processing time timers set, they might all fire 
immediately during a restore.

So the window will not start over from scratch. But inflight data that 
was about to reach the window operator will be reread from the source 
operator.

Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers


On 01.02.21 20:06, Rex Fenley wrote:
> We need to aggregate in precisely row order. Is there a safe way to do 
> this? Maybe with some sort of row time sequence number?
> 
> As written in another email, we're currently doing the following set of 
> operations
> valcompactedUserDocsStream = userDocsStream
> .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> .aggregate(newCompactionAggregate())
> 
> I guess my concern is if we restore from a checkpoint or savepoint I 
> don't understand how the window get's checkpointed and how window 
> alignment works between runs of a job. Will the window just start over 
> from scratch, and re-process any rows that may have been inflight but 
> not finished processing in the previous run's last window?
> 
> If so then I guess everything will arrive in row order like we want it 
> to. But if a window get's checkpointed with its previous proctime, then 
> it may be misaligned in the next run and drop rows that were in that window.
> 
> On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Rex,
> 
>     processing-time gives you no alignment of operators across nodes. Each
>     operation works with its local machine clock that might be interrupted
>     by the OS, Java garbage collector, etc. It is always a best effort
>     timing.
> 
>     Regards,
>     Timo
> 
> 
>     On 27.01.21 18:16, Rex Fenley wrote:
>      > Hello,
>      >
>      > I'm looking at ways to deduplicate data and found [1], but does
>     proctime
>      > get committed with operators? How does this work against clock
>     skew on
>      > different machines?
>      >
>      > Thanks
>      >
>      > [1]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
> 
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
> 
> 
> 
> -- 
> 
> Rex Fenley|Software Engineer - Mobile and Backend
> 
> 
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | 
> FOLLOW US <https://twitter.com/remindhq> | LIKE US 
> <https://www.facebook.com/remindhq>
> 
> 
> 


Re: Proctime consistency

Posted by Rex Fenley <Re...@remind101.com>.
We need to aggregate in precisely row order. Is there a safe way to do
this? Maybe with some sort of row time sequence number?

As written in another email, we're currently doing the following set of
operations
val compactedUserDocsStream = userDocsStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new CompactionAggregate())

I guess my concern is if we restore from a checkpoint or savepoint I don't
understand how the window get's checkpointed and how window alignment works
between runs of a job. Will the window just start over from scratch, and
re-process any rows that may have been inflight but not finished processing
in the previous run's last window?

If so then I guess everything will arrive in row order like we want it to.
But if a window get's checkpointed with its previous proctime, then it may
be misaligned in the next run and drop rows that were in that window.

On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <tw...@apache.org> wrote:

> Hi Rex,
>
> processing-time gives you no alignment of operators across nodes. Each
> operation works with its local machine clock that might be interrupted
> by the OS, Java garbage collector, etc. It is always a best effort timing.
>
> Regards,
> Timo
>
>
> On 27.01.21 18:16, Rex Fenley wrote:
> > Hello,
> >
> > I'm looking at ways to deduplicate data and found [1], but does proctime
> > get committed with operators? How does this work against clock skew on
> > different machines?
> >
> > Thanks
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>