You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Amit Ziv-Kenet <am...@gmail.com> on 2019/03/06 12:27:26 UTC

Stateful transforms and dropped late data

Hello,

I'm encountering an unexpected behavior in one of our pipeline, I hope you
might help me make sense of it.

This is a streaming pipeline implemented with the Java SDK (2.10) and
running on Dataflow.

* In the pipeline a FixedWindow is applied to the data with an allowed
lateness of a week (doesn't really matter).
* The windowed PCollection is then used in 2 separate branches: one regular
GroupByKey and and a second one which is Transform which makes have use of
State and Timers.
* In the stateful transform, incoming elements are added to a BagState
container until some condition on the elements is reached, and the elements
in the BagState are dispatched downstream. A timer makes sure that the
BagState is dispatched even if the condition is not met after some timeout
has expired.

Occasionally very late data enters the pipeline, with timestamps older than
the allowed lateness.
In these cases, the GroupByKey transform behaves as expected, and there
aren't any panes with the late data output downstream.
In the stateful transform, on the other hand, I see that these late
elements are processed and added to the BagState, but at a later point in
time (e.g. when the timer is triggered) the elements "disappear" from the
BagState and are no longer observable. This can happen multiple times, i.e.
late elements added to the BagState and then disappearing at a later time.

Is this an expected behavior? Is there some sort of late data "garbage
collection" which eventually removes late elements?

Thank you for your help, I hope my description is clear enough.

Regards,
Amit.

Re: Stateful transforms and dropped late data

Posted by Kenneth Knowles <ke...@apache.org>.
OK, I've now read your pipeline. In the minimal pipeline, are you saying
that you see logs with "STATE <event>" but then never a corresponding
"STATE TIMER"?

There are couple other things to mention:

 - reading data from Pubsub and then using WithTimestamps is not as good as
using the timestamp attribute control on PubsubIO. So you will create late
/ dropped data potentially.
 - you have a processing time timer, but when a window expires it will not
fire, so you also need to set an event time timer for the end of the
window + allowed lateness

Kenn

On Sun, Mar 17, 2019 at 6:53 AM Amit Ziv-Kenet <am...@gmail.com>
wrote:

> Hello,
>
> I've been able to produce a (relatively) minimal pipeline which reproduces
> the behavior I've previously noticed in one of our production pipelines.
>
> The minimal pipeline repo can be found here:
> https://github.com/azk/late-samples-state
> The actual pipeline is implemented in one big class here:
> https://github.com/azk/late-samples-state/blob/master/src/main/java/late/samples/state/LateSamplesState.java
>
> Basically, this pipeline consumes events from a pubsub topic, which are
> very simple json objects. The events are:
> * deserialized
> * each element's timestamp is extracted from the json payload
> * the elements are keyed and then windowed in a one-minute fixed window
> with an allowed lateness of 7 days, with processing time triggers and
> accumulating panes.
>
> The pipeline then branches into 2 branches:
> 1. A simple GroupByKey, followed by a logging step which records which
> panes were fired.
> 2. A stateful ParDo, which collects elements in a BagState, logs what is
> currently in the Bag and sets a one minute timer, which also logs the
> contents of the BagState.
>
> When running this pipeline in Dataflow, and publishing a few "very late"
> events through the script which is part of the repo:
> https://github.com/azk/late-samples-state/blob/master/scripts/publish_events.py ,
> the following behavior is observed:
> 1. The very late elements never appear after the GroupBy step, and are
> dropped as expected.
> 2. The very late elements are recorded in the stateful transform and added
> to the BagState.
> 3. When the timer expires, the BagState is empty and the elements that
> were previously in the Bag seem to disappear.
>
> Is this an expected behavior or is there some subtle issue going on?
> I'd be more than happy to deep dive into this with anyone interested in
> looking into this behavior.
>
> Thank you,
> Amit.
>
>
> On Wed, Mar 6, 2019 at 11:14 PM Amit Ziv-Kenet <am...@gmail.com>
> wrote:
>
>> Hi Kenn,
>> Thank you for the the explanation!
>>
>> These points make total sense, that's why I was surprised with the
>> observed behavior, which breaks at least points 3 and 4.
>> I'll try to extract and share a minimal working example which
>> demonstrates this behavior.
>>
>> Thank you,
>> Amit.
>>
>> On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> What you describe is not expected. Here are the relevant points, I think:
>>>
>>>  - A window is expired when the watermark is past the end of the
>>> window + allowed lateness
>>>  - An element is droppable when it is associated to an expired window
>>>  - All droppable elements should be dropped before reaching the stateful
>>> ParDo step
>>>  - The state and processing time timers for a particular key+window pair
>>> are garbage collected when the window is expired, because it is known that
>>> nothing can cause that state to be read
>>>  - A key+window state is not cleared until all event time timers have
>>> been dispatched
>>>
>>> Do these make sense? I'd love to see more detail of your pipeline code.
>>> One thing to note is that an element being behind the watermark doesn't
>>> really matter. What matters is how the watermark relates to its window.
>>>
>>> Kenn
>>>
>>> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <am...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm encountering an unexpected behavior in one of our pipeline, I hope
>>>> you might help me make sense of it.
>>>>
>>>> This is a streaming pipeline implemented with the Java SDK (2.10) and
>>>> running on Dataflow.
>>>>
>>>> * In the pipeline a FixedWindow is applied to the data with an allowed
>>>> lateness of a week (doesn't really matter).
>>>> * The windowed PCollection is then used in 2 separate branches: one
>>>> regular GroupByKey and and a second one which is Transform which makes have
>>>> use of State and Timers.
>>>> * In the stateful transform, incoming elements are added to a BagState
>>>> container until some condition on the elements is reached, and the elements
>>>> in the BagState are dispatched downstream. A timer makes sure that the
>>>> BagState is dispatched even if the condition is not met after some timeout
>>>> has expired.
>>>>
>>>> Occasionally very late data enters the pipeline, with timestamps older
>>>> than the allowed lateness.
>>>> In these cases, the GroupByKey transform behaves as expected, and there
>>>> aren't any panes with the late data output downstream.
>>>> In the stateful transform, on the other hand, I see that these late
>>>> elements are processed and added to the BagState, but at a later point in
>>>> time (e.g. when the timer is triggered) the elements "disappear" from the
>>>> BagState and are no longer observable. This can happen multiple times, i.e.
>>>> late elements added to the BagState and then disappearing at a later time.
>>>>
>>>> Is this an expected behavior? Is there some sort of late data "garbage
>>>> collection" which eventually removes late elements?
>>>>
>>>> Thank you for your help, I hope my description is clear enough.
>>>>
>>>> Regards,
>>>> Amit.
>>>>
>>>

Re: Stateful transforms and dropped late data

Posted by Amit Ziv-Kenet <am...@gmail.com>.
Hello,

I've been able to produce a (relatively) minimal pipeline which reproduces
the behavior I've previously noticed in one of our production pipelines.

The minimal pipeline repo can be found here:
https://github.com/azk/late-samples-state
The actual pipeline is implemented in one big class here:
https://github.com/azk/late-samples-state/blob/master/src/main/java/late/samples/state/LateSamplesState.java

Basically, this pipeline consumes events from a pubsub topic, which are
very simple json objects. The events are:
* deserialized
* each element's timestamp is extracted from the json payload
* the elements are keyed and then windowed in a one-minute fixed window
with an allowed lateness of 7 days, with processing time triggers and
accumulating panes.

The pipeline then branches into 2 branches:
1. A simple GroupByKey, followed by a logging step which records which
panes were fired.
2. A stateful ParDo, which collects elements in a BagState, logs what is
currently in the Bag and sets a one minute timer, which also logs the
contents of the BagState.

When running this pipeline in Dataflow, and publishing a few "very late"
events through the script which is part of the repo:
https://github.com/azk/late-samples-state/blob/master/scripts/publish_events.py
,
the following behavior is observed:
1. The very late elements never appear after the GroupBy step, and are
dropped as expected.
2. The very late elements are recorded in the stateful transform and added
to the BagState.
3. When the timer expires, the BagState is empty and the elements that were
previously in the Bag seem to disappear.

Is this an expected behavior or is there some subtle issue going on?
I'd be more than happy to deep dive into this with anyone interested in
looking into this behavior.

Thank you,
Amit.


On Wed, Mar 6, 2019 at 11:14 PM Amit Ziv-Kenet <am...@gmail.com>
wrote:

> Hi Kenn,
> Thank you for the the explanation!
>
> These points make total sense, that's why I was surprised with the
> observed behavior, which breaks at least points 3 and 4.
> I'll try to extract and share a minimal working example which demonstrates
> this behavior.
>
> Thank you,
> Amit.
>
> On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> What you describe is not expected. Here are the relevant points, I think:
>>
>>  - A window is expired when the watermark is past the end of the window +
>> allowed lateness
>>  - An element is droppable when it is associated to an expired window
>>  - All droppable elements should be dropped before reaching the stateful
>> ParDo step
>>  - The state and processing time timers for a particular key+window pair
>> are garbage collected when the window is expired, because it is known that
>> nothing can cause that state to be read
>>  - A key+window state is not cleared until all event time timers have
>> been dispatched
>>
>> Do these make sense? I'd love to see more detail of your pipeline code.
>> One thing to note is that an element being behind the watermark doesn't
>> really matter. What matters is how the watermark relates to its window.
>>
>> Kenn
>>
>> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <am...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm encountering an unexpected behavior in one of our pipeline, I hope
>>> you might help me make sense of it.
>>>
>>> This is a streaming pipeline implemented with the Java SDK (2.10) and
>>> running on Dataflow.
>>>
>>> * In the pipeline a FixedWindow is applied to the data with an allowed
>>> lateness of a week (doesn't really matter).
>>> * The windowed PCollection is then used in 2 separate branches: one
>>> regular GroupByKey and and a second one which is Transform which makes have
>>> use of State and Timers.
>>> * In the stateful transform, incoming elements are added to a BagState
>>> container until some condition on the elements is reached, and the elements
>>> in the BagState are dispatched downstream. A timer makes sure that the
>>> BagState is dispatched even if the condition is not met after some timeout
>>> has expired.
>>>
>>> Occasionally very late data enters the pipeline, with timestamps older
>>> than the allowed lateness.
>>> In these cases, the GroupByKey transform behaves as expected, and there
>>> aren't any panes with the late data output downstream.
>>> In the stateful transform, on the other hand, I see that these late
>>> elements are processed and added to the BagState, but at a later point in
>>> time (e.g. when the timer is triggered) the elements "disappear" from the
>>> BagState and are no longer observable. This can happen multiple times, i.e.
>>> late elements added to the BagState and then disappearing at a later time.
>>>
>>> Is this an expected behavior? Is there some sort of late data "garbage
>>> collection" which eventually removes late elements?
>>>
>>> Thank you for your help, I hope my description is clear enough.
>>>
>>> Regards,
>>> Amit.
>>>
>>

Re: Stateful transforms and dropped late data

Posted by Amit Ziv-Kenet <am...@gmail.com>.
Hi Kenn,
Thank you for the the explanation!

These points make total sense, that's why I was surprised with the observed
behavior, which breaks at least points 3 and 4.
I'll try to extract and share a minimal working example which demonstrates
this behavior.

Thank you,
Amit.

On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <ke...@apache.org> wrote:

> What you describe is not expected. Here are the relevant points, I think:
>
>  - A window is expired when the watermark is past the end of the window +
> allowed lateness
>  - An element is droppable when it is associated to an expired window
>  - All droppable elements should be dropped before reaching the stateful
> ParDo step
>  - The state and processing time timers for a particular key+window pair
> are garbage collected when the window is expired, because it is known that
> nothing can cause that state to be read
>  - A key+window state is not cleared until all event time timers have been
> dispatched
>
> Do these make sense? I'd love to see more detail of your pipeline code.
> One thing to note is that an element being behind the watermark doesn't
> really matter. What matters is how the watermark relates to its window.
>
> Kenn
>
> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <am...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm encountering an unexpected behavior in one of our pipeline, I hope
>> you might help me make sense of it.
>>
>> This is a streaming pipeline implemented with the Java SDK (2.10) and
>> running on Dataflow.
>>
>> * In the pipeline a FixedWindow is applied to the data with an allowed
>> lateness of a week (doesn't really matter).
>> * The windowed PCollection is then used in 2 separate branches: one
>> regular GroupByKey and and a second one which is Transform which makes have
>> use of State and Timers.
>> * In the stateful transform, incoming elements are added to a BagState
>> container until some condition on the elements is reached, and the elements
>> in the BagState are dispatched downstream. A timer makes sure that the
>> BagState is dispatched even if the condition is not met after some timeout
>> has expired.
>>
>> Occasionally very late data enters the pipeline, with timestamps older
>> than the allowed lateness.
>> In these cases, the GroupByKey transform behaves as expected, and there
>> aren't any panes with the late data output downstream.
>> In the stateful transform, on the other hand, I see that these late
>> elements are processed and added to the BagState, but at a later point in
>> time (e.g. when the timer is triggered) the elements "disappear" from the
>> BagState and are no longer observable. This can happen multiple times, i.e.
>> late elements added to the BagState and then disappearing at a later time.
>>
>> Is this an expected behavior? Is there some sort of late data "garbage
>> collection" which eventually removes late elements?
>>
>> Thank you for your help, I hope my description is clear enough.
>>
>> Regards,
>> Amit.
>>
>

Re: Stateful transforms and dropped late data

Posted by Kenneth Knowles <ke...@apache.org>.
What you describe is not expected. Here are the relevant points, I think:

 - A window is expired when the watermark is past the end of the window +
allowed lateness
 - An element is droppable when it is associated to an expired window
 - All droppable elements should be dropped before reaching the stateful
ParDo step
 - The state and processing time timers for a particular key+window pair
are garbage collected when the window is expired, because it is known that
nothing can cause that state to be read
 - A key+window state is not cleared until all event time timers have been
dispatched

Do these make sense? I'd love to see more detail of your pipeline code. One
thing to note is that an element being behind the watermark doesn't really
matter. What matters is how the watermark relates to its window.

Kenn

On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <am...@gmail.com>
wrote:

> Hello,
>
> I'm encountering an unexpected behavior in one of our pipeline, I hope you
> might help me make sense of it.
>
> This is a streaming pipeline implemented with the Java SDK (2.10) and
> running on Dataflow.
>
> * In the pipeline a FixedWindow is applied to the data with an allowed
> lateness of a week (doesn't really matter).
> * The windowed PCollection is then used in 2 separate branches: one
> regular GroupByKey and and a second one which is Transform which makes have
> use of State and Timers.
> * In the stateful transform, incoming elements are added to a BagState
> container until some condition on the elements is reached, and the elements
> in the BagState are dispatched downstream. A timer makes sure that the
> BagState is dispatched even if the condition is not met after some timeout
> has expired.
>
> Occasionally very late data enters the pipeline, with timestamps older
> than the allowed lateness.
> In these cases, the GroupByKey transform behaves as expected, and there
> aren't any panes with the late data output downstream.
> In the stateful transform, on the other hand, I see that these late
> elements are processed and added to the BagState, but at a later point in
> time (e.g. when the timer is triggered) the elements "disappear" from the
> BagState and are no longer observable. This can happen multiple times, i.e.
> late elements added to the BagState and then disappearing at a later time.
>
> Is this an expected behavior? Is there some sort of late data "garbage
> collection" which eventually removes late elements?
>
> Thank you for your help, I hope my description is clear enough.
>
> Regards,
> Amit.
>