You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "jmackay@godaddy.com" <jm...@godaddy.com> on 2020/08/03 13:20:07 UTC

Stateful Pardo Question

I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
+1

From: Reza Ardeshir Rokni <ra...@gmail.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Sunday, August 9, 2020 at 5:05 PM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


+1 on having the behavior clearly documented, would also be great to try and add more stat and timer patterns to the Beam docs patterns page https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns with an emphasis on the OnTimer being where the work happens. One thing that would make all of this a lot easier in reducing the boiler plate code that would need to be written is a sorted map state. ( a topic of discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax <re...@google.com>> wrote:
Timers in Beam are considered "eligible to fire" once the watermark has advanced. This is not the same as saying that they will fire immediately. You should not assume ordering between the elements and the timers.

This is one reason (among many) that Beam does not provide a "read watermark" primitive, as it leads to confusions such as this. Since there is no read-watermark operator, the only way for a user's ParDo to view that the watermark has been set is to set a timer and wait for it to expire. Watermarks on their own can act in very non-intuitive ways (due to asynchronous advancement), so generally we encourage people to reason about timers and windowing in their code instead.

Reuven

On Sun, Aug 9, 2020 at 9:39 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I understand that watermarks are concurrently advanced, and that they are estimates and not precise. but I’m not sure this is relevant in this case. In this repro code we are in processElement() and the watermark HAS advanced but the timer has not been called even though we asked the runtime to do that. In this case we are in a per-key stateful operating mode and our timer should not be shared with any other runners (is that correct?) so it seems to me that we should be able to operate in a manner that is locally consistent from the point of view of the DoFn we are writing. That is to say, _before_ we enter processElement we check any local timers first. I would argue that this would be far more sensible from the authors perspective.

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Thursday, August 6, 2020 at 11:57 PM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.




On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently check for timer expiry before calling process. The result is that it may be the case that the watermark has moved beyond your timer expiry, and if youre counting on the timer callback happening at the time you set it for, that simply may NOT have happened when you are in DoFn.process(). You can “fix” the behavior by simply checking the watermark manually in process() and doing what you would normally do for timestamp exipry before proceeding. See my latest updated code reproducing the issue and showing the fix at  https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback semantics will guarantee that when they are in process(), if the current watermark is past a timers expiry that the timer callback in question will have been called. Is there any reason why this isn’t happening? Am I misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have a timer set for 12pm and the watermark advances past 12pm, that timer is now eligible to fire, but might not fire immediately. Some other elements may process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee that watermark advancement is synchronous with element processing. The watermark might advance suddenly while in the middle processing an element, or at any other time. This makes it impossible (or at least, exceedingly difficult) to really provide the guarantee you expected.

Reuven

From: "jmackay@godaddy.com<ma...@godaddy.com>" <jm...@godaddy.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
Ahhh I see. Thank you very much for this additional info. Really helpful!  I think after considering further, its probably more appropriate and less risky in my current scenario to try to use the Session combiner. I did really like the Stateful ParDo way of doing things tho, if it were simpler to get correct and as performant as Windows (I understand that Flink has some special optimizations for windowing that go all the way down into the rocks db code) I might have liked to see this method work out.

Thanks again!

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Sunday, August 9, 2020 at 11:25 PM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Lookin at the code in the repo, it seems to assume that context.timestamp() is the "watermark" time. It is not - context.timestamp() is the time of the current element being processed. Generally the watermark will always be smaller than the timestamp of the current element, as the watermark is a lower bound on element timestamps (so you can't really check context.timestamp() to determine if a timer is eligible to fire). It's also worth mentioning that Beam provides no ordering guarantees on the input elements (unless you are using TestStream in a unit test). In theory they could arrive in reverse timestamp order. In the real world that degree of disorder is probably unlikely (and would be inefficient, as the watermark would then not advance until all elements were processed), however the model makes no guarantees about order.

The fact that inputs can arrive in any order means that the sessions code you are trying to implement would need some more complexity if you wanted it to be correct. The problem is that you may have buffered elements from multiple different sessions in your bag, and you may see those elements out of order. Resetting the timer to event.getTimestamp().plus(SESSION_TIMEOUT) will cause you to potentially create a timer that is too early. There are various ways to solve this (e.g. storing an interval tree in a separate state tag so you can keep track of which sessions are in flight). The upcoming TimestampOrderedList state type will also help to make this sort of use case easier and more effficient.

Reuven

On Sun, Aug 9, 2020 at 5:05 PM Reza Ardeshir Rokni <ra...@gmail.com>> wrote:
+1 on having the behavior clearly documented, would also be great to try and add more stat and timer patterns to the Beam docs patterns page https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns with an emphasis on the OnTimer being where the work happens. One thing that would make all of this a lot easier in reducing the boiler plate code that would need to be written is a sorted map state. ( a topic of discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax <re...@google.com>> wrote:
Timers in Beam are considered "eligible to fire" once the watermark has advanced. This is not the same as saying that they will fire immediately. You should not assume ordering between the elements and the timers.

This is one reason (among many) that Beam does not provide a "read watermark" primitive, as it leads to confusions such as this. Since there is no read-watermark operator, the only way for a user's ParDo to view that the watermark has been set is to set a timer and wait for it to expire. Watermarks on their own can act in very non-intuitive ways (due to asynchronous advancement), so generally we encourage people to reason about timers and windowing in their code instead.

Reuven

On Sun, Aug 9, 2020 at 9:39 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I understand that watermarks are concurrently advanced, and that they are estimates and not precise. but I’m not sure this is relevant in this case. In this repro code we are in processElement() and the watermark HAS advanced but the timer has not been called even though we asked the runtime to do that. In this case we are in a per-key stateful operating mode and our timer should not be shared with any other runners (is that correct?) so it seems to me that we should be able to operate in a manner that is locally consistent from the point of view of the DoFn we are writing. That is to say, _before_ we enter processElement we check any local timers first. I would argue that this would be far more sensible from the authors perspective.

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Thursday, August 6, 2020 at 11:57 PM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.




On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently check for timer expiry before calling process. The result is that it may be the case that the watermark has moved beyond your timer expiry, and if youre counting on the timer callback happening at the time you set it for, that simply may NOT have happened when you are in DoFn.process(). You can “fix” the behavior by simply checking the watermark manually in process() and doing what you would normally do for timestamp exipry before proceeding. See my latest updated code reproducing the issue and showing the fix at  https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback semantics will guarantee that when they are in process(), if the current watermark is past a timers expiry that the timer callback in question will have been called. Is there any reason why this isn’t happening? Am I misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have a timer set for 12pm and the watermark advances past 12pm, that timer is now eligible to fire, but might not fire immediately. Some other elements may process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee that watermark advancement is synchronous with element processing. The watermark might advance suddenly while in the middle processing an element, or at any other time. This makes it impossible (or at least, exceedingly difficult) to really provide the guarantee you expected.

Reuven

From: "jmackay@godaddy.com<ma...@godaddy.com>" <jm...@godaddy.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by Reuven Lax <re...@google.com>.
Lookin at the code in the repo, it seems to assume that context.timestamp()
is the "watermark" time. It is not - context.timestamp() is the time of the
current element being processed. Generally the watermark will always be
smaller than the timestamp of the current element, as the watermark is a
lower bound on element timestamps (so you can't really check
context.timestamp() to determine if a timer is eligible to fire). It's also
worth mentioning that Beam provides no ordering guarantees on the input
elements (unless you are using TestStream in a unit test). In theory they
could arrive in reverse timestamp order. In the real world that degree of
disorder is probably unlikely (and would be inefficient, as the watermark
would then not advance until all elements were processed), however the
model makes no guarantees about order.

The fact that inputs can arrive in any order means that the sessions code
you are trying to implement would need some more complexity if you wanted
it to be correct. The problem is that you may have buffered elements from
multiple different sessions in your bag, and you may see those elements out
of order. Resetting the timer to event.getTimestamp().plus(SESSION_TIMEOUT)
will cause you to potentially create a timer that is too early. There are
various ways to solve this (e.g. storing an interval tree in a separate
state tag so you can keep track of which sessions are in flight). The
upcoming TimestampOrderedList state type will also help to make this sort
of use case easier and more effficient.

Reuven

On Sun, Aug 9, 2020 at 5:05 PM Reza Ardeshir Rokni <ra...@gmail.com>
wrote:

> +1 on having the behavior clearly documented, would also be great to try
> and add more stat and timer patterns to the Beam docs patterns page
> https://beam.apache.org/documentation/patterns/overview/.
>
> I think it might be worth thinking about describing these kind of patterns
> with an emphasis on the OnTimer being where the work happens. One thing
> that would make all of this a lot easier in reducing the boiler plate code
> that would need to be written is a sorted map state. ( a topic of
> discussion on a few threads).
>
> On Mon, 10 Aug 2020 at 01:16, Reuven Lax <re...@google.com> wrote:
>
>> Timers in Beam are considered "eligible to fire" once the watermark has
>> advanced. This is not the same as saying that they will fire immediately.
>> You should not assume ordering between the elements and the timers.
>>
>> This is one reason (among many) that Beam does not provide a "read
>> watermark" primitive, as it leads to confusions such as this. Since there
>> is no read-watermark operator, the only way for a user's ParDo to view that
>> the watermark has been set is to set a timer and wait for it to expire.
>> Watermarks on their own can act in very non-intuitive ways (due to
>> asynchronous advancement), so generally we encourage people to reason about
>> timers and windowing in their code instead.
>>
>> Reuven
>>
>> On Sun, Aug 9, 2020 at 9:39 AM jmackay@godaddy.com <jm...@godaddy.com>
>> wrote:
>>
>>> I understand that watermarks are concurrently advanced, and that they
>>> are estimates and not precise. but I’m not sure this is relevant in this
>>> case. In this repro code we are in processElement() and the watermark HAS
>>> advanced but the timer has not been called even though we asked the runtime
>>> to do that. In this case we are in a per-key stateful operating mode and
>>> our timer should not be shared with any other runners (is that correct?) so
>>> it seems to me that we should be able to operate in a manner that is
>>> locally consistent from the point of view of the DoFn we are writing. That
>>> is to say, _*before*_ we enter processElement we check any local timers
>>> first. I would argue that this would be far more sensible from the authors
>>> perspective.
>>>
>>>
>>>
>>> *From: *Reuven Lax <re...@google.com>
>>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>>> *Date: *Thursday, August 6, 2020 at 11:57 PM
>>> *To: *dev <de...@beam.apache.org>
>>> *Subject: *Re: Stateful Pardo Question
>>>
>>>
>>>
>>> Notice: This email is from an external sender.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com <jm...@godaddy.com>
>>> wrote:
>>>
>>> So, after some additional digging, it appears that Beam does not
>>> consistently check for timer expiry before calling process. The result is
>>> that it may be the case that the watermark has moved beyond your timer
>>> expiry, and if youre counting on the timer callback happening at the time
>>> you set it for, that simply may NOT have happened when you are in
>>> DoFn.process(). You can “fix” the behavior by simply checking the watermark
>>> manually in process() and doing what you would normally do for timestamp
>>> exipry before proceeding. See my latest updated code reproducing the issue
>>> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>>>
>>>
>>>
>>> I would argue that users of this API will naturally expect that timer
>>> callback semantics will guarantee that when they are in process(), if the
>>> current watermark is past a timers expiry that the timer callback in
>>> question will have been called. Is there any reason why this isn’t
>>> happening? Am I misunderstanding something?
>>>
>>>
>>>
>>> Timers do not expire synchronously with the watermark advancing. So if
>>> you have a timer set for 12pm and the watermark advances past 12pm, that
>>> timer is now eligible to fire, but might not fire immediately. Some other
>>> elements may process before that timer fires.
>>>
>>>
>>>
>>> There are multiple reasons for this, but one is that Beam does not
>>> guarantee that watermark advancement is synchronous with element
>>> processing. The watermark might advance suddenly while in the middle
>>> processing an element, or at any other time. This makes it impossible (or
>>> at least, exceedingly difficult) to really provide the guarantee you
>>> expected.
>>>
>>>
>>>
>>> Reuven
>>>
>>>
>>>
>>> *From: *"jmackay@godaddy.com" <jm...@godaddy.com>
>>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>>> *Date: *Monday, August 3, 2020 at 10:51 AM
>>> *To: *"dev@beam.apache.org" <de...@beam.apache.org>
>>> *Subject: *Re: Stateful Pardo Question
>>>
>>>
>>>
>>> Notice: This email is from an external sender.
>>>
>>>
>>>
>>> Yeah, unless I am misunderstanding something. The output from my repro
>>> code shows event timestamp and the context timestamp every time we process
>>> an event.
>>>
>>> Receiving event at: 2000-01-01T00:00:00.000Z
>>>
>>> Resetting timer to : 2000-01-01T00:15:00.000Z
>>>
>>> Receiving event at: 2000-01-01T00:05:00.000Z
>>>
>>> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer
>>> have fired before we processed the next event?
>>>
>>> Receiving event at: 2000-01-01T00:40:00.000Z
>>>
>>> Why didnt the timer fire?
>>>
>>> Resetting timer to : 2000-01-01T00:55:00.000Z
>>>
>>> Receiving event at: 2000-01-01T00:45:00.000Z
>>>
>>> Resetting timer to : 2000-01-01T01:00:00.000Z
>>>
>>> Receiving event at: 2000-01-01T00:50:00.000Z
>>>
>>> Resetting timer to : 2000-01-01T01:05:00.000Z
>>>
>>> Timer firing at: 2000-01-01T01:05:00.000Z
>>>
>>>
>>>
>>> *From: *Reuven Lax <re...@google.com>
>>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>>> *Date: *Monday, August 3, 2020 at 10:02 AM
>>> *To: *dev <de...@beam.apache.org>
>>> *Subject: *Re: Stateful Pardo Question
>>>
>>>
>>>
>>> Notice: This email is from an external sender.
>>>
>>>
>>>
>>> Are you sure that there is a 15 minute gap in your data?
>>>
>>>
>>>
>>> On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
>>> wrote:
>>>
>>> I am confused about the behavior of timers on a simple stateful pardo. I
>>> have put together a little repro here:
>>> https://github.com/randomsamples/pardo_repro
>>>
>>>
>>>
>>> I basically want to build something like a session window, accumulating
>>> events until quiescence of the stream for a given key and gap time, then
>>> output results. But it appears that the timer is not firing when the
>>> watermark is passed it expiration time, so the event stream is not being
>>> split as I would have expected. Would love some help getting this work, the
>>> behavior is for a project I’m working on.
>>>
>>>

Re: Stateful Pardo Question

Posted by Reza Ardeshir Rokni <ra...@gmail.com>.
+1 on having the behavior clearly documented, would also be great to try
and add more stat and timer patterns to the Beam docs patterns page
https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns
with an emphasis on the OnTimer being where the work happens. One thing
that would make all of this a lot easier in reducing the boiler plate code
that would need to be written is a sorted map state. ( a topic of
discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax <re...@google.com> wrote:

> Timers in Beam are considered "eligible to fire" once the watermark has
> advanced. This is not the same as saying that they will fire immediately.
> You should not assume ordering between the elements and the timers.
>
> This is one reason (among many) that Beam does not provide a "read
> watermark" primitive, as it leads to confusions such as this. Since there
> is no read-watermark operator, the only way for a user's ParDo to view that
> the watermark has been set is to set a timer and wait for it to expire.
> Watermarks on their own can act in very non-intuitive ways (due to
> asynchronous advancement), so generally we encourage people to reason about
> timers and windowing in their code instead.
>
> Reuven
>
> On Sun, Aug 9, 2020 at 9:39 AM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
>> I understand that watermarks are concurrently advanced, and that they are
>> estimates and not precise. but I’m not sure this is relevant in this case.
>> In this repro code we are in processElement() and the watermark HAS
>> advanced but the timer has not been called even though we asked the runtime
>> to do that. In this case we are in a per-key stateful operating mode and
>> our timer should not be shared with any other runners (is that correct?) so
>> it seems to me that we should be able to operate in a manner that is
>> locally consistent from the point of view of the DoFn we are writing. That
>> is to say, _*before*_ we enter processElement we check any local timers
>> first. I would argue that this would be far more sensible from the authors
>> perspective.
>>
>>
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Date: *Thursday, August 6, 2020 at 11:57 PM
>> *To: *dev <de...@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com <jm...@godaddy.com>
>> wrote:
>>
>> So, after some additional digging, it appears that Beam does not
>> consistently check for timer expiry before calling process. The result is
>> that it may be the case that the watermark has moved beyond your timer
>> expiry, and if youre counting on the timer callback happening at the time
>> you set it for, that simply may NOT have happened when you are in
>> DoFn.process(). You can “fix” the behavior by simply checking the watermark
>> manually in process() and doing what you would normally do for timestamp
>> exipry before proceeding. See my latest updated code reproducing the issue
>> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>>
>>
>>
>> I would argue that users of this API will naturally expect that timer
>> callback semantics will guarantee that when they are in process(), if the
>> current watermark is past a timers expiry that the timer callback in
>> question will have been called. Is there any reason why this isn’t
>> happening? Am I misunderstanding something?
>>
>>
>>
>> Timers do not expire synchronously with the watermark advancing. So if
>> you have a timer set for 12pm and the watermark advances past 12pm, that
>> timer is now eligible to fire, but might not fire immediately. Some other
>> elements may process before that timer fires.
>>
>>
>>
>> There are multiple reasons for this, but one is that Beam does not
>> guarantee that watermark advancement is synchronous with element
>> processing. The watermark might advance suddenly while in the middle
>> processing an element, or at any other time. This makes it impossible (or
>> at least, exceedingly difficult) to really provide the guarantee you
>> expected.
>>
>>
>>
>> Reuven
>>
>>
>>
>> *From: *"jmackay@godaddy.com" <jm...@godaddy.com>
>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Date: *Monday, August 3, 2020 at 10:51 AM
>> *To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>> Yeah, unless I am misunderstanding something. The output from my repro
>> code shows event timestamp and the context timestamp every time we process
>> an event.
>>
>> Receiving event at: 2000-01-01T00:00:00.000Z
>>
>> Resetting timer to : 2000-01-01T00:15:00.000Z
>>
>> Receiving event at: 2000-01-01T00:05:00.000Z
>>
>> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
>> fired before we processed the next event?
>>
>> Receiving event at: 2000-01-01T00:40:00.000Z
>>
>> Why didnt the timer fire?
>>
>> Resetting timer to : 2000-01-01T00:55:00.000Z
>>
>> Receiving event at: 2000-01-01T00:45:00.000Z
>>
>> Resetting timer to : 2000-01-01T01:00:00.000Z
>>
>> Receiving event at: 2000-01-01T00:50:00.000Z
>>
>> Resetting timer to : 2000-01-01T01:05:00.000Z
>>
>> Timer firing at: 2000-01-01T01:05:00.000Z
>>
>>
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Date: *Monday, August 3, 2020 at 10:02 AM
>> *To: *dev <de...@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>> Are you sure that there is a 15 minute gap in your data?
>>
>>
>>
>> On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
>> wrote:
>>
>> I am confused about the behavior of timers on a simple stateful pardo. I
>> have put together a little repro here:
>> https://github.com/randomsamples/pardo_repro
>>
>>
>>
>> I basically want to build something like a session window, accumulating
>> events until quiescence of the stream for a given key and gap time, then
>> output results. But it appears that the timer is not firing when the
>> watermark is passed it expiration time, so the event stream is not being
>> split as I would have expected. Would love some help getting this work, the
>> behavior is for a project I’m working on.
>>
>>

Re: Stateful Pardo Question

Posted by Reuven Lax <re...@google.com>.
Timers in Beam are considered "eligible to fire" once the watermark has
advanced. This is not the same as saying that they will fire immediately.
You should not assume ordering between the elements and the timers.

This is one reason (among many) that Beam does not provide a "read
watermark" primitive, as it leads to confusions such as this. Since there
is no read-watermark operator, the only way for a user's ParDo to view that
the watermark has been set is to set a timer and wait for it to expire.
Watermarks on their own can act in very non-intuitive ways (due to
asynchronous advancement), so generally we encourage people to reason about
timers and windowing in their code instead.

Reuven

On Sun, Aug 9, 2020 at 9:39 AM jmackay@godaddy.com <jm...@godaddy.com>
wrote:

> I understand that watermarks are concurrently advanced, and that they are
> estimates and not precise. but I’m not sure this is relevant in this case.
> In this repro code we are in processElement() and the watermark HAS
> advanced but the timer has not been called even though we asked the runtime
> to do that. In this case we are in a per-key stateful operating mode and
> our timer should not be shared with any other runners (is that correct?) so
> it seems to me that we should be able to operate in a manner that is
> locally consistent from the point of view of the DoFn we are writing. That
> is to say, _*before*_ we enter processElement we check any local timers
> first. I would argue that this would be far more sensible from the authors
> perspective.
>
>
>
> *From: *Reuven Lax <re...@google.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Thursday, August 6, 2020 at 11:57 PM
> *To: *dev <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
>
>
>
>
> On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
> So, after some additional digging, it appears that Beam does not
> consistently check for timer expiry before calling process. The result is
> that it may be the case that the watermark has moved beyond your timer
> expiry, and if youre counting on the timer callback happening at the time
> you set it for, that simply may NOT have happened when you are in
> DoFn.process(). You can “fix” the behavior by simply checking the watermark
> manually in process() and doing what you would normally do for timestamp
> exipry before proceeding. See my latest updated code reproducing the issue
> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>
>
>
> I would argue that users of this API will naturally expect that timer
> callback semantics will guarantee that when they are in process(), if the
> current watermark is past a timers expiry that the timer callback in
> question will have been called. Is there any reason why this isn’t
> happening? Am I misunderstanding something?
>
>
>
> Timers do not expire synchronously with the watermark advancing. So if you
> have a timer set for 12pm and the watermark advances past 12pm, that timer
> is now eligible to fire, but might not fire immediately. Some other
> elements may process before that timer fires.
>
>
>
> There are multiple reasons for this, but one is that Beam does not
> guarantee that watermark advancement is synchronous with element
> processing. The watermark might advance suddenly while in the middle
> processing an element, or at any other time. This makes it impossible (or
> at least, exceedingly difficult) to really provide the guarantee you
> expected.
>
>
>
> Reuven
>
>
>
> *From: *"jmackay@godaddy.com" <jm...@godaddy.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:51 AM
> *To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Yeah, unless I am misunderstanding something. The output from my repro
> code shows event timestamp and the context timestamp every time we process
> an event.
>
> Receiving event at: 2000-01-01T00:00:00.000Z
>
> Resetting timer to : 2000-01-01T00:15:00.000Z
>
> Receiving event at: 2000-01-01T00:05:00.000Z
>
> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
> fired before we processed the next event?
>
> Receiving event at: 2000-01-01T00:40:00.000Z
>
> Why didnt the timer fire?
>
> Resetting timer to : 2000-01-01T00:55:00.000Z
>
> Receiving event at: 2000-01-01T00:45:00.000Z
>
> Resetting timer to : 2000-01-01T01:00:00.000Z
>
> Receiving event at: 2000-01-01T00:50:00.000Z
>
> Resetting timer to : 2000-01-01T01:05:00.000Z
>
> Timer firing at: 2000-01-01T01:05:00.000Z
>
>
>
> *From: *Reuven Lax <re...@google.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:02 AM
> *To: *dev <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Are you sure that there is a 15 minute gap in your data?
>
>
>
> On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
> I am confused about the behavior of timers on a simple stateful pardo. I
> have put together a little repro here:
> https://github.com/randomsamples/pardo_repro
>
>
>
> I basically want to build something like a session window, accumulating
> events until quiescence of the stream for a given key and gap time, then
> output results. But it appears that the timer is not firing when the
> watermark is passed it expiration time, so the event stream is not being
> split as I would have expected. Would love some help getting this work, the
> behavior is for a project I’m working on.
>
>

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
I understand that watermarks are concurrently advanced, and that they are estimates and not precise. but I’m not sure this is relevant in this case. In this repro code we are in processElement() and the watermark HAS advanced but the timer has not been called even though we asked the runtime to do that. In this case we are in a per-key stateful operating mode and our timer should not be shared with any other runners (is that correct?) so it seems to me that we should be able to operate in a manner that is locally consistent from the point of view of the DoFn we are writing. That is to say, _before_ we enter processElement we check any local timers first. I would argue that this would be far more sensible from the authors perspective.

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Thursday, August 6, 2020 at 11:57 PM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.




On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently check for timer expiry before calling process. The result is that it may be the case that the watermark has moved beyond your timer expiry, and if youre counting on the timer callback happening at the time you set it for, that simply may NOT have happened when you are in DoFn.process(). You can “fix” the behavior by simply checking the watermark manually in process() and doing what you would normally do for timestamp exipry before proceeding. See my latest updated code reproducing the issue and showing the fix at  https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback semantics will guarantee that when they are in process(), if the current watermark is past a timers expiry that the timer callback in question will have been called. Is there any reason why this isn’t happening? Am I misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have a timer set for 12pm and the watermark advances past 12pm, that timer is now eligible to fire, but might not fire immediately. Some other elements may process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee that watermark advancement is synchronous with element processing. The watermark might advance suddenly while in the middle processing an element, or at any other time. This makes it impossible (or at least, exceedingly difficult) to really provide the guarantee you expected.

Reuven

From: "jmackay@godaddy.com<ma...@godaddy.com>" <jm...@godaddy.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
Okay, yes I think I understand what you’re saying. Use processEvent just to continually accumulate events and reset the timer iff is has never been set before, then in onTimer you will need to check each event to see if its in the closing session by comparing it with the last set timer time. For each event in the closing session, remove it from the bag and send it on its way, finally resetting the timer to the last elements time if needed, or setting it to an unset value if no additional events are left in the bag. This will work…

But isn’t that quite a bit of gymnastics and highly unintuitive? I think this behavior from the runtime is not what users will expect. At least it should be clearly documented that you cannot expect timers to be processed before processEvent is called _even if the watermark has been advanced_... the user code would be MUCH more sensible if the timer checked happened before processEvent().

From: Reza Ardeshir Rokni <ra...@gmail.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Sunday, August 9, 2020 at 9:45 AM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.



I think the difference is that you could try doing the timer resets within the OnTimer code ( after the initial start) rather than onProcess() . This way it doesn't matter if more events arrive before the timer fires. As you would sort them when the timer actually does go off. You would need to store your elements as Timestamped<T> of course. Assuming I have understood the use case correctly.

Sorry I won't have time to try it out myself this week, but it's a worthwhile pattern to explore and publish on the patterns page.

Cheers
Rez

On Mon, 10 Aug 2020, 00:30 jmackay@godaddy.com<ma...@godaddy.com>, <jm...@godaddy.com>> wrote:
This is pretty much what the repro code does. The problem is that it doesn’t work the way we would expect it should because the timer isn’t called before processevent.

From: Reza Ardeshir Rokni <ra...@gmail.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Friday, August 7, 2020 at 5:34 AM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Hi,

One possible approach ( have not tried it out, so might be missing cases..) but you can reset the timer from within the OnTimer code.

So maybe you start the timer on the onprocess to go off at current+requiredGap. Then OnTimer, you check the list of elements and output a session if nothing new. Then reset the timer to go off either at latestTimestampValue+requiredGap if there was new elements or at currentEventTime+requiredGap. If a timer fires and there are no elements in the bag then you don't rest.

You will need to keep state to know you have a timer firing so as not to set it again in OnProcess as there is no read() for timers.

Also we don't have a sorted map state, so you will take a performance hit as you will need to keep sorting the events every OnTimer...

Cheers
Reza


On Fri, 7 Aug 2020 at 14:57, Reuven Lax <re...@google.com>> wrote:


On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently check for timer expiry before calling process. The result is that it may be the case that the watermark has moved beyond your timer expiry, and if youre counting on the timer callback happening at the time you set it for, that simply may NOT have happened when you are in DoFn.process(). You can “fix” the behavior by simply checking the watermark manually in process() and doing what you would normally do for timestamp exipry before proceeding. See my latest updated code reproducing the issue and showing the fix at  https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback semantics will guarantee that when they are in process(), if the current watermark is past a timers expiry that the timer callback in question will have been called. Is there any reason why this isn’t happening? Am I misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have a timer set for 12pm and the watermark advances past 12pm, that timer is now eligible to fire, but might not fire immediately. Some other elements may process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee that watermark advancement is synchronous with element processing. The watermark might advance suddenly while in the middle processing an element, or at any other time. This makes it impossible (or at least, exceedingly difficult) to really provide the guarantee you expected.

Reuven

From: "jmackay@godaddy.com<ma...@godaddy.com>" <jm...@godaddy.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by Reza Ardeshir Rokni <ra...@gmail.com>.
I think the difference is that you could try doing the timer resets within
the OnTimer code ( after the initial start) rather than onProcess() . This
way it doesn't matter if more events arrive before the timer fires. As you
would sort them when the timer actually does go off. You would need to
store your elements as Timestamped<T> of course. Assuming I have understood
the use case correctly.

Sorry I won't have time to try it out myself this week, but it's a
worthwhile pattern to explore and publish on the patterns page.

Cheers
Rez

On Mon, 10 Aug 2020, 00:30 jmackay@godaddy.com, <jm...@godaddy.com> wrote:

> This is pretty much what the repro code does. The problem is that it
> doesn’t work the way we would expect it should because the timer isn’t
> called before processevent.
>
>
>
> *From: *Reza Ardeshir Rokni <ra...@gmail.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Friday, August 7, 2020 at 5:34 AM
> *To: *dev <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Hi,
>
>
>
> One possible approach ( have not tried it out, so might be missing
> cases..) but you can reset the timer from within the OnTimer code.
>
>
>
> So maybe you start the timer on the onprocess to go off at
> current+requiredGap. Then OnTimer, you check the list of elements and
> output a session if nothing new. Then reset the timer to go off either at
> latestTimestampValue+requiredGap if there was new elements or at
> currentEventTime+requiredGap. If a timer fires and there are no elements in
> the bag then you don't rest.
>
>
>
> You will need to keep state to know you have a timer firing so as not to
> set it again in OnProcess as there is no read() for timers.
>
>
>
> Also we don't have a sorted map state, so you will take a performance hit
> as you will need to keep sorting the events every OnTimer...
>
>
>
> Cheers
>
> Reza
>
>
>
>
>
> On Fri, 7 Aug 2020 at 14:57, Reuven Lax <re...@google.com> wrote:
>
>
>
>
>
> On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
> So, after some additional digging, it appears that Beam does not
> consistently check for timer expiry before calling process. The result is
> that it may be the case that the watermark has moved beyond your timer
> expiry, and if youre counting on the timer callback happening at the time
> you set it for, that simply may NOT have happened when you are in
> DoFn.process(). You can “fix” the behavior by simply checking the watermark
> manually in process() and doing what you would normally do for timestamp
> exipry before proceeding. See my latest updated code reproducing the issue
> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>
>
>
> I would argue that users of this API will naturally expect that timer
> callback semantics will guarantee that when they are in process(), if the
> current watermark is past a timers expiry that the timer callback in
> question will have been called. Is there any reason why this isn’t
> happening? Am I misunderstanding something?
>
>
>
> Timers do not expire synchronously with the watermark advancing. So if you
> have a timer set for 12pm and the watermark advances past 12pm, that timer
> is now eligible to fire, but might not fire immediately. Some other
> elements may process before that timer fires.
>
>
>
> There are multiple reasons for this, but one is that Beam does not
> guarantee that watermark advancement is synchronous with element
> processing. The watermark might advance suddenly while in the middle
> processing an element, or at any other time. This makes it impossible (or
> at least, exceedingly difficult) to really provide the guarantee you
> expected.
>
>
>
> Reuven
>
>
>
> *From: *"jmackay@godaddy.com" <jm...@godaddy.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:51 AM
> *To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Yeah, unless I am misunderstanding something. The output from my repro
> code shows event timestamp and the context timestamp every time we process
> an event.
>
> Receiving event at: 2000-01-01T00:00:00.000Z
>
> Resetting timer to : 2000-01-01T00:15:00.000Z
>
> Receiving event at: 2000-01-01T00:05:00.000Z
>
> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
> fired before we processed the next event?
>
> Receiving event at: 2000-01-01T00:40:00.000Z
>
> Why didnt the timer fire?
>
> Resetting timer to : 2000-01-01T00:55:00.000Z
>
> Receiving event at: 2000-01-01T00:45:00.000Z
>
> Resetting timer to : 2000-01-01T01:00:00.000Z
>
> Receiving event at: 2000-01-01T00:50:00.000Z
>
> Resetting timer to : 2000-01-01T01:05:00.000Z
>
> Timer firing at: 2000-01-01T01:05:00.000Z
>
>
>
> *From: *Reuven Lax <re...@google.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:02 AM
> *To: *dev <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Are you sure that there is a 15 minute gap in your data?
>
>
>
> On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
> I am confused about the behavior of timers on a simple stateful pardo. I
> have put together a little repro here:
> https://github.com/randomsamples/pardo_repro
>
>
>
> I basically want to build something like a session window, accumulating
> events until quiescence of the stream for a given key and gap time, then
> output results. But it appears that the timer is not firing when the
> watermark is passed it expiration time, so the event stream is not being
> split as I would have expected. Would love some help getting this work, the
> behavior is for a project I’m working on.
>
>

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
This is pretty much what the repro code does. The problem is that it doesn’t work the way we would expect it should because the timer isn’t called before processevent.

From: Reza Ardeshir Rokni <ra...@gmail.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Friday, August 7, 2020 at 5:34 AM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Hi,

One possible approach ( have not tried it out, so might be missing cases..) but you can reset the timer from within the OnTimer code.

So maybe you start the timer on the onprocess to go off at current+requiredGap. Then OnTimer, you check the list of elements and output a session if nothing new. Then reset the timer to go off either at latestTimestampValue+requiredGap if there was new elements or at currentEventTime+requiredGap. If a timer fires and there are no elements in the bag then you don't rest.

You will need to keep state to know you have a timer firing so as not to set it again in OnProcess as there is no read() for timers.

Also we don't have a sorted map state, so you will take a performance hit as you will need to keep sorting the events every OnTimer...

Cheers
Reza


On Fri, 7 Aug 2020 at 14:57, Reuven Lax <re...@google.com>> wrote:


On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently check for timer expiry before calling process. The result is that it may be the case that the watermark has moved beyond your timer expiry, and if youre counting on the timer callback happening at the time you set it for, that simply may NOT have happened when you are in DoFn.process(). You can “fix” the behavior by simply checking the watermark manually in process() and doing what you would normally do for timestamp exipry before proceeding. See my latest updated code reproducing the issue and showing the fix at  https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback semantics will guarantee that when they are in process(), if the current watermark is past a timers expiry that the timer callback in question will have been called. Is there any reason why this isn’t happening? Am I misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have a timer set for 12pm and the watermark advances past 12pm, that timer is now eligible to fire, but might not fire immediately. Some other elements may process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee that watermark advancement is synchronous with element processing. The watermark might advance suddenly while in the middle processing an element, or at any other time. This makes it impossible (or at least, exceedingly difficult) to really provide the guarantee you expected.

Reuven

From: "jmackay@godaddy.com<ma...@godaddy.com>" <jm...@godaddy.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by Reza Ardeshir Rokni <ra...@gmail.com>.
Hi,

One possible approach ( have not tried it out, so might be missing cases..)
but you can reset the timer from within the OnTimer code.

So maybe you start the timer on the onprocess to go off at
current+requiredGap. Then OnTimer, you check the list of elements and
output a session if nothing new. Then reset the timer to go off either at
latestTimestampValue+requiredGap if there was new elements or at
currentEventTime+requiredGap. If a timer fires and there are no elements in
the bag then you don't rest.

You will need to keep state to know you have a timer firing so as not to
set it again in OnProcess as there is no read() for timers.

Also we don't have a sorted map state, so you will take a performance hit
as you will need to keep sorting the events every OnTimer...

Cheers
Reza


On Fri, 7 Aug 2020 at 14:57, Reuven Lax <re...@google.com> wrote:

>
>
> On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
>> So, after some additional digging, it appears that Beam does not
>> consistently check for timer expiry before calling process. The result is
>> that it may be the case that the watermark has moved beyond your timer
>> expiry, and if youre counting on the timer callback happening at the time
>> you set it for, that simply may NOT have happened when you are in
>> DoFn.process(). You can “fix” the behavior by simply checking the watermark
>> manually in process() and doing what you would normally do for timestamp
>> exipry before proceeding. See my latest updated code reproducing the issue
>> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>>
>>
>>
>> I would argue that users of this API will naturally expect that timer
>> callback semantics will guarantee that when they are in process(), if the
>> current watermark is past a timers expiry that the timer callback in
>> question will have been called. Is there any reason why this isn’t
>> happening? Am I misunderstanding something?
>>
>
> Timers do not expire synchronously with the watermark advancing. So if you
> have a timer set for 12pm and the watermark advances past 12pm, that timer
> is now eligible to fire, but might not fire immediately. Some other
> elements may process before that timer fires.
>
> There are multiple reasons for this, but one is that Beam does not
> guarantee that watermark advancement is synchronous with element
> processing. The watermark might advance suddenly while in the middle
> processing an element, or at any other time. This makes it impossible (or
> at least, exceedingly difficult) to really provide the guarantee you
> expected.
>
> Reuven
>
>>
>>
>> *From: *"jmackay@godaddy.com" <jm...@godaddy.com>
>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Date: *Monday, August 3, 2020 at 10:51 AM
>> *To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>> Yeah, unless I am misunderstanding something. The output from my repro
>> code shows event timestamp and the context timestamp every time we process
>> an event.
>>
>> Receiving event at: 2000-01-01T00:00:00.000Z
>>
>> Resetting timer to : 2000-01-01T00:15:00.000Z
>>
>> Receiving event at: 2000-01-01T00:05:00.000Z
>>
>> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
>> fired before we processed the next event?
>>
>> Receiving event at: 2000-01-01T00:40:00.000Z
>>
>> Why didnt the timer fire?
>>
>> Resetting timer to : 2000-01-01T00:55:00.000Z
>>
>> Receiving event at: 2000-01-01T00:45:00.000Z
>>
>> Resetting timer to : 2000-01-01T01:00:00.000Z
>>
>> Receiving event at: 2000-01-01T00:50:00.000Z
>>
>> Resetting timer to : 2000-01-01T01:05:00.000Z
>>
>> Timer firing at: 2000-01-01T01:05:00.000Z
>>
>>
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
>> *Date: *Monday, August 3, 2020 at 10:02 AM
>> *To: *dev <de...@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>> Are you sure that there is a 15 minute gap in your data?
>>
>>
>>
>> On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
>> wrote:
>>
>> I am confused about the behavior of timers on a simple stateful pardo. I
>> have put together a little repro here:
>> https://github.com/randomsamples/pardo_repro
>>
>>
>>
>> I basically want to build something like a session window, accumulating
>> events until quiescence of the stream for a given key and gap time, then
>> output results. But it appears that the timer is not firing when the
>> watermark is passed it expiration time, so the event stream is not being
>> split as I would have expected. Would love some help getting this work, the
>> behavior is for a project I’m working on.
>>
>>

Re: Stateful Pardo Question

Posted by Reuven Lax <re...@google.com>.
On Tue, Aug 4, 2020 at 1:08 PM jmackay@godaddy.com <jm...@godaddy.com>
wrote:

> So, after some additional digging, it appears that Beam does not
> consistently check for timer expiry before calling process. The result is
> that it may be the case that the watermark has moved beyond your timer
> expiry, and if youre counting on the timer callback happening at the time
> you set it for, that simply may NOT have happened when you are in
> DoFn.process(). You can “fix” the behavior by simply checking the watermark
> manually in process() and doing what you would normally do for timestamp
> exipry before proceeding. See my latest updated code reproducing the issue
> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>
>
>
> I would argue that users of this API will naturally expect that timer
> callback semantics will guarantee that when they are in process(), if the
> current watermark is past a timers expiry that the timer callback in
> question will have been called. Is there any reason why this isn’t
> happening? Am I misunderstanding something?
>

Timers do not expire synchronously with the watermark advancing. So if you
have a timer set for 12pm and the watermark advances past 12pm, that timer
is now eligible to fire, but might not fire immediately. Some other
elements may process before that timer fires.

There are multiple reasons for this, but one is that Beam does not
guarantee that watermark advancement is synchronous with element
processing. The watermark might advance suddenly while in the middle
processing an element, or at any other time. This makes it impossible (or
at least, exceedingly difficult) to really provide the guarantee you
expected.

Reuven

>
>
> *From: *"jmackay@godaddy.com" <jm...@godaddy.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:51 AM
> *To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Yeah, unless I am misunderstanding something. The output from my repro
> code shows event timestamp and the context timestamp every time we process
> an event.
>
> Receiving event at: 2000-01-01T00:00:00.000Z
>
> Resetting timer to : 2000-01-01T00:15:00.000Z
>
> Receiving event at: 2000-01-01T00:05:00.000Z
>
> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
> fired before we processed the next event?
>
> Receiving event at: 2000-01-01T00:40:00.000Z
>
> Why didnt the timer fire?
>
> Resetting timer to : 2000-01-01T00:55:00.000Z
>
> Receiving event at: 2000-01-01T00:45:00.000Z
>
> Resetting timer to : 2000-01-01T01:00:00.000Z
>
> Receiving event at: 2000-01-01T00:50:00.000Z
>
> Resetting timer to : 2000-01-01T01:05:00.000Z
>
> Timer firing at: 2000-01-01T01:05:00.000Z
>
>
>
> *From: *Reuven Lax <re...@google.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:02 AM
> *To: *dev <de...@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Are you sure that there is a 15 minute gap in your data?
>
>
>
> On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
> wrote:
>
> I am confused about the behavior of timers on a simple stateful pardo. I
> have put together a little repro here:
> https://github.com/randomsamples/pardo_repro
>
>
>
> I basically want to build something like a session window, accumulating
> events until quiescence of the stream for a given key and gap time, then
> output results. But it appears that the timer is not firing when the
> watermark is passed it expiration time, so the event stream is not being
> split as I would have expected. Would love some help getting this work, the
> behavior is for a project I’m working on.
>
>

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
So, after some additional digging, it appears that Beam does not consistently check for timer expiry before calling process. The result is that it may be the case that the watermark has moved beyond your timer expiry, and if youre counting on the timer callback happening at the time you set it for, that simply may NOT have happened when you are in DoFn.process(). You can “fix” the behavior by simply checking the watermark manually in process() and doing what you would normally do for timestamp exipry before proceeding. See my latest updated code reproducing the issue and showing the fix at  https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback semantics will guarantee that when they are in process(), if the current watermark is past a timers expiry that the timer callback in question will have been called. Is there any reason why this isn’t happening? Am I misunderstanding something?

From: "jmackay@godaddy.com" <jm...@godaddy.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org" <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by "jmackay@godaddy.com" <jm...@godaddy.com>.
Yeah, unless I am misunderstanding something. The output from my repro code shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <de...@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com<ma...@godaddy.com> <jm...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events until quiescence of the stream for a given key and gap time, then output results. But it appears that the timer is not firing when the watermark is passed it expiration time, so the event stream is not being split as I would have expected. Would love some help getting this work, the behavior is for a project I’m working on.

Re: Stateful Pardo Question

Posted by Reuven Lax <re...@google.com>.
Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmackay@godaddy.com <jm...@godaddy.com>
wrote:

> I am confused about the behavior of timers on a simple stateful pardo. I
> have put together a little repro here:
> https://github.com/randomsamples/pardo_repro
>
>
>
> I basically want to build something like a session window, accumulating
> events until quiescence of the stream for a given key and gap time, then
> output results. But it appears that the timer is not firing when the
> watermark is passed it expiration time, so the event stream is not being
> split as I would have expected. Would love some help getting this work, the
> behavior is for a project I’m working on.
>