You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Radu Tudoran <ra...@huawei.com> on 2017/04/27 08:51:02 UTC

question about rowtime processfunction - are watermarks needed?

Hi,

I am looking at the implementation of  RowTimeBoundedRangeOver (in the context of Stream SQL). I see that the logic is that the progress happens based on the timestamps of the rowevent - i.e., when an even arrives we register to be processed based on it's timestamp (ctx.timerService.registerEventTimeTimer(triggeringTs))

In the onTimer we remove (retract) data that has expired. However, we do not consider watermarks nor some allowed latency for the events or anything like this, which makes me ask:
Don't we need to work with watermarks when we deal with even time? And keep the events within the allowed delayed/next watermark?  Am I missing something? Or maybe we do not consider at this point allowedLateness  for this version?

Thanks

Best regards,


Re: question about rowtime processfunction - are watermarks needed?

Posted by Fabian Hueske <fh...@gmail.com>.
I woundn't say that the batch / stream unification doesn't hold.
If you make the watermark lag enough (i.e., you do not have any late data)
you get exact results, however, not very timely.

Watermarks basically trade completeness of the result for result latency.
With support for late updates, you can get early (possibly incomplete)
results and correct them when you get additional (late) data.

However, you need to discard state at some point in time.
If you receive late data for which the required state was discarded, this
is the point where batch and streaming results start to diverge.

Cheers, Fabian

2017-04-28 11:39 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Hi,
> Thanks again Fabian for the explanation.
> Considering what you said - is there anymore a duality with the batch
> case? As the stream cases are non-deterministic I would say the duality in
> the sense that a query on the stream should return the same as the query on
> the batched data does not hold anymore?
> I am just trying to get a deeper understanding of this, which I think will
> apply also to the other functions and SQL operators...sorry for bothering
> you with this.
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhueske@gmail.com]
> Sent: Friday, April 28, 2017 9:56 AM
> To: dev@flink.apache.org
> Subject: Re: question about rowtime processfunction - are watermarks
> needed?
>
> Hi Radu,
>
> yes that might happen in a parallel setup and depends on the "speed" of
> the parallel threads.
> An operator does only increment its own event-time clock to the minimum of
> the last watermark received from each input channel.
> If one input channel is "slow", the event-time of an operator lacks behind
> and "late" events of the other threads are correctly processed because the
> operators event-time was not incremented yet.
>
> So, event-time is not deterministic when it comes to which records are
> dropped.
> The watermark documentation might be helpful as well [1].
>
> Cheers,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/event_time.html#watermarks-in-parallel-streams
>
> 2017-04-27 22:09 GMT+02:00 Radu Tudoran <ra...@huawei.com>:
>
> > Re-hi,
> >
> > I debuged a bit the test for the Event rowtime
> >
> > I tested the testBoundNonPartitionedEventTimeWindowWithRange from
> > SQLITCase class
> >
> > Although I would expect that once a watermark is triggered: 1) the on
> > timer will be called to process the events that arrived so far and 2)
> > the future events that arrive will be dropped. However, it seems that
> > almost the entire input can arrive in the processElement function
> > before the onTimer is triggered.
> >
> > Moreover, if you modify the input to add an un-ordered event (see
> > dataset below where I added after watermark 14000 ...an event with
> > watermark 1000...as far as I would expect this should be dropped.
> > However, in different runs it can happen that it will be not dropped.
> > Basically it can happen that the onTimer was never triggered and this
> > event arrives and it is registered). Is this correct? Am I missing
> something?
> >
> >
> >    @Test
> >   def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
> >     val data = Seq(
> >       Left((1500L, (1L, 15, "Hello"))),
> >       Left((1600L, (1L, 16, "Hello"))),
> >       Left((1000L, (1L, 1, "Hello"))),
> >       Left((2000L, (2L, 2, "Hello"))),
> >       Right(1000L),
> >       Left((2000L, (2L, 2, "Hello"))),
> >       Left((2000L, (2L, 3, "Hello"))),
> >       Left((3000L, (3L, 3, "Hello"))),
> >       Right(2000L),
> >       Left((4000L, (4L, 4, "Hello"))),
> >       Right(3000L),
> >       Left((5000L, (5L, 5, "Hello"))),
> >       Right(5000L),
> >       Left((6000L, (6L, 6, "Hello"))),
> >       Left((6500L, (6L, 65, "Hello"))),
> >       Right(7000L),
> >       Left((9000L, (6L, 9, "Hello"))),
> >       Left((9500L, (6L, 18, "Hello"))),
> >       Left((9000L, (6L, 9, "Hello"))),
> >       Right(10000L),
> >       Left((10000L, (7L, 7, "Hello World"))),
> >       Left((11000L, (7L, 17, "Hello World"))),
> >       Left((11000L, (7L, 77, "Hello World"))),
> >       Right(12000L),
> >       Left((14000L, (7L, 18, "Hello World"))),
> >       Right(14000L),
> >       Left((15000L, (8L, 8, "Hello World"))),
> >       Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out
> > of ordered and showed be droppped
> >       Right(17000L),
> >       Left((20000L, (20L, 20, "Hello World"))),
> >       Right(19000L))
> >
> >
> >
> >
> > -----Original Message-----
> > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > Sent: Thursday, April 27, 2017 3:17 PM
> > To: dev@flink.apache.org
> > Subject: Re: question about rowtime processfunction - are watermarks
> > needed?
> >
> > Hi Radu,
> >
> > event-time processing requires watermarks. Operators use watermarks to
> > compute the current event-time.
> > The ProcessFunctions for over range windows use the TimerServices to
> > group elements by time.
> > In case of event-time, the timers are triggered by the event-time of
> > the operator which is derived from the received watermarks.
> > In case of processing-time, the timers are triggered based on the
> > wallclock time of the operator.
> >
> > So by using event-tim timers, we implicitly rely on the watermarks
> > because the timers are triggered based on the received watermarks.
> >
> > Best, Fabian
> >
> >
> > 2017-04-27 10:51 GMT+02:00 Radu Tudoran <ra...@huawei.com>:
> >
> > > Hi,
> > >
> > > I am looking at the implementation of  RowTimeBoundedRangeOver (in
> > > the context of Stream SQL). I see that the logic is that the
> > > progress happens based on the timestamps of the rowevent - i.e.,
> > > when an even arrives we register to be processed based on it's
> > > timestamp
> > (ctx.timerService.
> > > registerEventTimeTimer(triggeringTs))
> > >
> > > In the onTimer we remove (retract) data that has expired. However,
> > > we do not consider watermarks nor some allowed latency for the
> > > events or anything like this, which makes me ask:
> > > Don't we need to work with watermarks when we deal with even time?
> > > And keep the events within the allowed delayed/next watermark?  Am I
> > > missing something? Or maybe we do not consider at this point
> > > allowedLateness  for this version?
> > >
> > > Thanks
> > >
> > > Best regards,
> > >
> > >
> >
>

RE: question about rowtime processfunction - are watermarks needed?

Posted by Radu Tudoran <ra...@huawei.com>.
Hi,
Thanks again Fabian for the explanation. 
Considering what you said - is there anymore a duality with the batch case? As the stream cases are non-deterministic I would say the duality in the sense that a query on the stream should return the same as the query on the batched data does not hold anymore?
I am just trying to get a deeper understanding of this, which I think will apply also to the other functions and SQL operators...sorry for bothering you with this.

-----Original Message-----
From: Fabian Hueske [mailto:fhueske@gmail.com] 
Sent: Friday, April 28, 2017 9:56 AM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

yes that might happen in a parallel setup and depends on the "speed" of the parallel threads.
An operator does only increment its own event-time clock to the minimum of the last watermark received from each input channel.
If one input channel is "slow", the event-time of an operator lacks behind and "late" events of the other threads are correctly processed because the operators event-time was not incremented yet.

So, event-time is not deterministic when it comes to which records are dropped.
The watermark documentation might be helpful as well [1].

Cheers,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams

2017-04-27 22:09 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Re-hi,
>
> I debuged a bit the test for the Event rowtime
>
> I tested the testBoundNonPartitionedEventTimeWindowWithRange from 
> SQLITCase class
>
> Although I would expect that once a watermark is triggered: 1) the on 
> timer will be called to process the events that arrived so far and 2) 
> the future events that arrive will be dropped. However, it seems that 
> almost the entire input can arrive in the processElement function 
> before the onTimer is triggered.
>
> Moreover, if you modify the input to add an un-ordered event (see 
> dataset below where I added after watermark 14000 ...an event with 
> watermark 1000...as far as I would expect this should be dropped. 
> However, in different runs it can happen that it will be not dropped. 
> Basically it can happen that the onTimer was never triggered and this 
> event arrives and it is registered). Is this correct? Am I missing something?
>
>
>    @Test
>   def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
>     val data = Seq(
>       Left((1500L, (1L, 15, "Hello"))),
>       Left((1600L, (1L, 16, "Hello"))),
>       Left((1000L, (1L, 1, "Hello"))),
>       Left((2000L, (2L, 2, "Hello"))),
>       Right(1000L),
>       Left((2000L, (2L, 2, "Hello"))),
>       Left((2000L, (2L, 3, "Hello"))),
>       Left((3000L, (3L, 3, "Hello"))),
>       Right(2000L),
>       Left((4000L, (4L, 4, "Hello"))),
>       Right(3000L),
>       Left((5000L, (5L, 5, "Hello"))),
>       Right(5000L),
>       Left((6000L, (6L, 6, "Hello"))),
>       Left((6500L, (6L, 65, "Hello"))),
>       Right(7000L),
>       Left((9000L, (6L, 9, "Hello"))),
>       Left((9500L, (6L, 18, "Hello"))),
>       Left((9000L, (6L, 9, "Hello"))),
>       Right(10000L),
>       Left((10000L, (7L, 7, "Hello World"))),
>       Left((11000L, (7L, 17, "Hello World"))),
>       Left((11000L, (7L, 77, "Hello World"))),
>       Right(12000L),
>       Left((14000L, (7L, 18, "Hello World"))),
>       Right(14000L),
>       Left((15000L, (8L, 8, "Hello World"))),
>       Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out
> of ordered and showed be droppped
>       Right(17000L),
>       Left((20000L, (20L, 20, "Hello World"))),
>       Right(19000L))
>
>
>
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhueske@gmail.com]
> Sent: Thursday, April 27, 2017 3:17 PM
> To: dev@flink.apache.org
> Subject: Re: question about rowtime processfunction - are watermarks 
> needed?
>
> Hi Radu,
>
> event-time processing requires watermarks. Operators use watermarks to 
> compute the current event-time.
> The ProcessFunctions for over range windows use the TimerServices to 
> group elements by time.
> In case of event-time, the timers are triggered by the event-time of 
> the operator which is derived from the received watermarks.
> In case of processing-time, the timers are triggered based on the 
> wallclock time of the operator.
>
> So by using event-tim timers, we implicitly rely on the watermarks 
> because the timers are triggered based on the received watermarks.
>
> Best, Fabian
>
>
> 2017-04-27 10:51 GMT+02:00 Radu Tudoran <ra...@huawei.com>:
>
> > Hi,
> >
> > I am looking at the implementation of  RowTimeBoundedRangeOver (in 
> > the context of Stream SQL). I see that the logic is that the 
> > progress happens based on the timestamps of the rowevent - i.e., 
> > when an even arrives we register to be processed based on it's 
> > timestamp
> (ctx.timerService.
> > registerEventTimeTimer(triggeringTs))
> >
> > In the onTimer we remove (retract) data that has expired. However, 
> > we do not consider watermarks nor some allowed latency for the 
> > events or anything like this, which makes me ask:
> > Don't we need to work with watermarks when we deal with even time? 
> > And keep the events within the allowed delayed/next watermark?  Am I 
> > missing something? Or maybe we do not consider at this point 
> > allowedLateness  for this version?
> >
> > Thanks
> >
> > Best regards,
> >
> >
>

Re: question about rowtime processfunction - are watermarks needed?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Radu,

yes that might happen in a parallel setup and depends on the "speed" of the
parallel threads.
An operator does only increment its own event-time clock to the minimum of
the last watermark received from each input channel.
If one input channel is "slow", the event-time of an operator lacks behind
and "late" events of the other threads are correctly processed because the
operators event-time was not incremented yet.

So, event-time is not deterministic when it comes to which records are
dropped.
The watermark documentation might be helpful as well [1].

Cheers,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams

2017-04-27 22:09 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Re-hi,
>
> I debuged a bit the test for the Event rowtime
>
> I tested the testBoundNonPartitionedEventTimeWindowWithRange from
> SQLITCase class
>
> Although I would expect that once a watermark is triggered: 1) the on
> timer will be called to process the events that arrived so far and 2) the
> future events that arrive will be dropped. However, it seems that almost
> the entire input can arrive in the processElement function before the
> onTimer is triggered.
>
> Moreover, if you modify the input to add an un-ordered event (see dataset
> below where I added after watermark 14000 ...an event with watermark
> 1000...as far as I would expect this should be dropped. However, in
> different runs it can happen that it will be not dropped. Basically it can
> happen that the onTimer was never triggered and this event arrives and it
> is registered). Is this correct? Am I missing something?
>
>
>    @Test
>   def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
>     val data = Seq(
>       Left((1500L, (1L, 15, "Hello"))),
>       Left((1600L, (1L, 16, "Hello"))),
>       Left((1000L, (1L, 1, "Hello"))),
>       Left((2000L, (2L, 2, "Hello"))),
>       Right(1000L),
>       Left((2000L, (2L, 2, "Hello"))),
>       Left((2000L, (2L, 3, "Hello"))),
>       Left((3000L, (3L, 3, "Hello"))),
>       Right(2000L),
>       Left((4000L, (4L, 4, "Hello"))),
>       Right(3000L),
>       Left((5000L, (5L, 5, "Hello"))),
>       Right(5000L),
>       Left((6000L, (6L, 6, "Hello"))),
>       Left((6500L, (6L, 65, "Hello"))),
>       Right(7000L),
>       Left((9000L, (6L, 9, "Hello"))),
>       Left((9500L, (6L, 18, "Hello"))),
>       Left((9000L, (6L, 9, "Hello"))),
>       Right(10000L),
>       Left((10000L, (7L, 7, "Hello World"))),
>       Left((11000L, (7L, 17, "Hello World"))),
>       Left((11000L, (7L, 77, "Hello World"))),
>       Right(12000L),
>       Left((14000L, (7L, 18, "Hello World"))),
>       Right(14000L),
>       Left((15000L, (8L, 8, "Hello World"))),
>       Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out
> of ordered and showed be droppped
>       Right(17000L),
>       Left((20000L, (20L, 20, "Hello World"))),
>       Right(19000L))
>
>
>
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhueske@gmail.com]
> Sent: Thursday, April 27, 2017 3:17 PM
> To: dev@flink.apache.org
> Subject: Re: question about rowtime processfunction - are watermarks
> needed?
>
> Hi Radu,
>
> event-time processing requires watermarks. Operators use watermarks to
> compute the current event-time.
> The ProcessFunctions for over range windows use the TimerServices to group
> elements by time.
> In case of event-time, the timers are triggered by the event-time of the
> operator which is derived from the received watermarks.
> In case of processing-time, the timers are triggered based on the
> wallclock time of the operator.
>
> So by using event-tim timers, we implicitly rely on the watermarks because
> the timers are triggered based on the received watermarks.
>
> Best, Fabian
>
>
> 2017-04-27 10:51 GMT+02:00 Radu Tudoran <ra...@huawei.com>:
>
> > Hi,
> >
> > I am looking at the implementation of  RowTimeBoundedRangeOver (in the
> > context of Stream SQL). I see that the logic is that the progress
> > happens based on the timestamps of the rowevent - i.e., when an even
> > arrives we register to be processed based on it's timestamp
> (ctx.timerService.
> > registerEventTimeTimer(triggeringTs))
> >
> > In the onTimer we remove (retract) data that has expired. However, we
> > do not consider watermarks nor some allowed latency for the events or
> > anything like this, which makes me ask:
> > Don't we need to work with watermarks when we deal with even time? And
> > keep the events within the allowed delayed/next watermark?  Am I
> > missing something? Or maybe we do not consider at this point
> > allowedLateness  for this version?
> >
> > Thanks
> >
> > Best regards,
> >
> >
>

RE: question about rowtime processfunction - are watermarks needed?

Posted by Radu Tudoran <ra...@huawei.com>.
Re-hi,

I debuged a bit the test for the Event rowtime

I tested the testBoundNonPartitionedEventTimeWindowWithRange from SQLITCase class

Although I would expect that once a watermark is triggered: 1) the on timer will be called to process the events that arrived so far and 2) the future events that arrive will be dropped. However, it seems that almost the entire input can arrive in the processElement function before the onTimer is triggered. 

Moreover, if you modify the input to add an un-ordered event (see dataset below where I added after watermark 14000 ...an event with watermark 1000...as far as I would expect this should be dropped. However, in different runs it can happen that it will be not dropped. Basically it can happen that the onTimer was never triggered and this event arrives and it is registered). Is this correct? Am I missing something?


   @Test
  def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
    val data = Seq(
      Left((1500L, (1L, 15, "Hello"))),
      Left((1600L, (1L, 16, "Hello"))),
      Left((1000L, (1L, 1, "Hello"))),
      Left((2000L, (2L, 2, "Hello"))),
      Right(1000L),
      Left((2000L, (2L, 2, "Hello"))),
      Left((2000L, (2L, 3, "Hello"))),
      Left((3000L, (3L, 3, "Hello"))),
      Right(2000L),
      Left((4000L, (4L, 4, "Hello"))),
      Right(3000L),
      Left((5000L, (5L, 5, "Hello"))),
      Right(5000L),
      Left((6000L, (6L, 6, "Hello"))),
      Left((6500L, (6L, 65, "Hello"))),
      Right(7000L),
      Left((9000L, (6L, 9, "Hello"))),
      Left((9500L, (6L, 18, "Hello"))),
      Left((9000L, (6L, 9, "Hello"))),
      Right(10000L),
      Left((10000L, (7L, 7, "Hello World"))),
      Left((11000L, (7L, 17, "Hello World"))),
      Left((11000L, (7L, 77, "Hello World"))),
      Right(12000L),
      Left((14000L, (7L, 18, "Hello World"))),
      Right(14000L),
      Left((15000L, (8L, 8, "Hello World"))),
      Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out of ordered and showed be droppped
      Right(17000L),
      Left((20000L, (20L, 20, "Hello World"))),
      Right(19000L))




-----Original Message-----
From: Fabian Hueske [mailto:fhueske@gmail.com] 
Sent: Thursday, April 27, 2017 3:17 PM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

event-time processing requires watermarks. Operators use watermarks to compute the current event-time.
The ProcessFunctions for over range windows use the TimerServices to group elements by time.
In case of event-time, the timers are triggered by the event-time of the operator which is derived from the received watermarks.
In case of processing-time, the timers are triggered based on the wallclock time of the operator.

So by using event-tim timers, we implicitly rely on the watermarks because the timers are triggered based on the received watermarks.

Best, Fabian


2017-04-27 10:51 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Hi,
>
> I am looking at the implementation of  RowTimeBoundedRangeOver (in the 
> context of Stream SQL). I see that the logic is that the progress 
> happens based on the timestamps of the rowevent - i.e., when an even 
> arrives we register to be processed based on it's timestamp (ctx.timerService.
> registerEventTimeTimer(triggeringTs))
>
> In the onTimer we remove (retract) data that has expired. However, we 
> do not consider watermarks nor some allowed latency for the events or 
> anything like this, which makes me ask:
> Don't we need to work with watermarks when we deal with even time? And 
> keep the events within the allowed delayed/next watermark?  Am I 
> missing something? Or maybe we do not consider at this point 
> allowedLateness  for this version?
>
> Thanks
>
> Best regards,
>
>

RE: question about rowtime processfunction - are watermarks needed?

Posted by Radu Tudoran <ra...@huawei.com>.
Thanks for the explanation.

-----Original Message-----
From: Fabian Hueske [mailto:fhueske@gmail.com] 
Sent: Thursday, April 27, 2017 3:17 PM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

event-time processing requires watermarks. Operators use watermarks to compute the current event-time.
The ProcessFunctions for over range windows use the TimerServices to group elements by time.
In case of event-time, the timers are triggered by the event-time of the operator which is derived from the received watermarks.
In case of processing-time, the timers are triggered based on the wallclock time of the operator.

So by using event-tim timers, we implicitly rely on the watermarks because the timers are triggered based on the received watermarks.

Best, Fabian


2017-04-27 10:51 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Hi,
>
> I am looking at the implementation of  RowTimeBoundedRangeOver (in the 
> context of Stream SQL). I see that the logic is that the progress 
> happens based on the timestamps of the rowevent - i.e., when an even 
> arrives we register to be processed based on it's timestamp (ctx.timerService.
> registerEventTimeTimer(triggeringTs))
>
> In the onTimer we remove (retract) data that has expired. However, we 
> do not consider watermarks nor some allowed latency for the events or 
> anything like this, which makes me ask:
> Don't we need to work with watermarks when we deal with even time? And 
> keep the events within the allowed delayed/next watermark?  Am I 
> missing something? Or maybe we do not consider at this point 
> allowedLateness  for this version?
>
> Thanks
>
> Best regards,
>
>

Re: question about rowtime processfunction - are watermarks needed?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Radu,

event-time processing requires watermarks. Operators use watermarks to
compute the current event-time.
The ProcessFunctions for over range windows use the TimerServices to group
elements by time.
In case of event-time, the timers are triggered by the event-time of the
operator which is derived from the received watermarks.
In case of processing-time, the timers are triggered based on the wallclock
time of the operator.

So by using event-tim timers, we implicitly rely on the watermarks because
the timers are triggered based on the received watermarks.

Best, Fabian


2017-04-27 10:51 GMT+02:00 Radu Tudoran <ra...@huawei.com>:

> Hi,
>
> I am looking at the implementation of  RowTimeBoundedRangeOver (in the
> context of Stream SQL). I see that the logic is that the progress happens
> based on the timestamps of the rowevent - i.e., when an even arrives we
> register to be processed based on it's timestamp (ctx.timerService.
> registerEventTimeTimer(triggeringTs))
>
> In the onTimer we remove (retract) data that has expired. However, we do
> not consider watermarks nor some allowed latency for the events or anything
> like this, which makes me ask:
> Don't we need to work with watermarks when we deal with even time? And
> keep the events within the allowed delayed/next watermark?  Am I missing
> something? Or maybe we do not consider at this point allowedLateness  for
> this version?
>
> Thanks
>
> Best regards,
>
>