You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/16 17:48:30 UTC

Re: Carry forward state information from one window to next

Hi Reza,
I think looping timers so far has been working fine for my use case:
However I am unable to DRAIN job successfully because of @onTimer. I am
getting following error:

Error message from worker: java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@588b361b received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
before the appropriate cleanup time 294247-01-10T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@54e513ce received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
before the appropriate cleanup time 294247-01-10T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@6ed9a56b received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
before the appropriate cleanup time 294247-01-10T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

NOTE: I have upgrade beam java sdk to 2.23.0

Thanks and Regards
Mohil

On Thu, Jul 16, 2020 at 3:54 PM Reza Rokni <re...@google.com> wrote:

> Hi,
>
> So one solution would be to allow late data, but then within your State do
> a separate output of items that are late marked with a tag which indicates
> a 'correction'. In the State and Timer piece you will be able to know they
> are late as they will have a timestamp which is older than the current
> watermark ( note you will need to store the watermark, by ensuring that you
> store the 'last timer to fire timestamp' in state). It adds a lot more
> complexity...
>
> Cheers
> Reza
>
> On Thu, Jul 16, 2020 at 8:24 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hello Reza,
>>
>> I went through a looping timer blog followed by Berlin beam summit video.
>> "Gap Filling- Hold and propagation" : That's exactly what I was looking
>> for.  Thanks a lot, that was really helpful.
>> You mentioned that there are few mitigation techniques for solving late
>> data use cases. Can you suggest any ?
>>
>> Thanks and regards
>> Mohil
>>
>> On Mon, Jul 13, 2020 at 11:19 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hello Reza,
>>> Thanks a lot for this information. I really appreciate it. Will
>>> definitely go through it and see if I can come up with something for my use
>>> case. Will keep you posted.
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>> On Sun, Jul 12, 2020 at 5:42 PM Reza Rokni <re...@google.com> wrote:
>>>
>>>> Hi Mohil,
>>>>
>>>> You may find this video from the Berlin Beam summit useful:
>>>>
>>>> https://www.youtube.com/watch?v=Y_HoNNU6b3I
>>>>
>>>> There is a piece about hold and propagate of values about 9 mins in,
>>>> but you may also be interested in the looping timers section before that as
>>>> well.
>>>>
>>>> Cheers
>>>> Reza
>>>>
>>>> On Sat, Jul 11, 2020 at 4:28 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> I see..
>>>>> Thanks a lot Luke for explaining.
>>>>> Let me think through this. Will get back to you if needed.
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>> On Fri, Jul 10, 2020 at 12:07 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> The solution only works if you can have an upstream DoFn compute the
>>>>>> data that you need to pass forward to a fixed number of future windows, so
>>>>>> the graph would look something like:
>>>>>> PCollection(Data) -> ParDo(ComputeSideInputData) ->
>>>>>> PCollection(SideInputData) -----\
>>>>>>
>>>>>> \-----------------------------------------------------> ParDo(StatefulDoFn)
>>>>>>
>>>>>> There are generally two ways to do this,
>>>>>> 1) (easier) have ComputeSideInputData produce what you want to carry
>>>>>> forward within the next window, the side input lookup will perform a join
>>>>>> using the same window.
>>>>>> 2) (harder) create your own WindowFn and make sure that the side
>>>>>> input mapping[1] looks at the past window.
>>>>>>
>>>>>> This only allows you to pass forward information for up to a fixed
>>>>>> number of windows forward so you can't have data from window A goto B and C
>>>>>> on infinitely like a loop. This works if what your computing only cares
>>>>>> about so much data in the past and anything beyond that becomes effectively
>>>>>> noise (so recent data has a much higher weight then old data). The
>>>>>> alternative is to use the global window to store all the aggregated results
>>>>>> so far but side inputs may produce stale data when the side input was
>>>>>> produced using multiple firings. Using the global window is best if the
>>>>>> latest result isn't as important as having the majority of the data.
>>>>>>
>>>>>>
>>>>>> 1:
>>>>>> https://github.com/apache/beam/blob/6e77bca71f2e875f829d42be11a8d064badf8a86/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L48
>>>>>>
>>>>>> On Fri, Jul 10, 2020 at 10:53 AM Mohil Khare <mo...@prosimo.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Luke,
>>>>>>> Like always thanks for your prompt response.
>>>>>>>
>>>>>>> Can you please elaborate on "Or if the data you need to pass forward
>>>>>>> can be done as an earlier computation and then passed in as a side input
>>>>>>> using a window mapping fn that looks up the past window". I didn't quite
>>>>>>> understand this.
>>>>>>>
>>>>>>> Thanks again
>>>>>>> Mohil
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 10:43 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> Typically no since all state is key and window partitioned. You can
>>>>>>>> window into a larger window, perform your stateful computation and then
>>>>>>>> re-window into the shorter one after it. Or if the data you need to pass
>>>>>>>> forward can be done as an earlier computation and then passed in as a side
>>>>>>>> input using a window mapping fn that looks up the past window.
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 10:30 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I am using beam on dataflow with java sdk 2.19.0.
>>>>>>>>> I have a use case where I need to collect some messages in a short
>>>>>>>>> window of few seconds to 1 minute, update state (stateful processing of
>>>>>>>>> beam) and carry forward this state information to next window and use this
>>>>>>>>> to initialize state of next window i.e something like following:
>>>>>>>>>
>>>>>>>>> [image: image.png]
>>>>>>>>>
>>>>>>>>> Since windows are very short, I don't want to write to some
>>>>>>>>> external DB on window expiry and read from DB in next window. Is it
>>>>>>>>> possible to carry forward state information from one window to next ? Or
>>>>>>>>> any other suggestion to achieve this?
>>>>>>>>>
>>>>>>>>> Thanks and regards
>>>>>>>>> Mohil
>>>>>>>>>
>>>>>>>>

Re: Carry forward state information from one window to next

Posted by Mohil Khare <mo...@prosimo.io>.
Hello Reza,

Thanks for your prompt response.
I see... any plans of supporting this in the upcoming relase ?
Actually I prefer doing DRAIN vs CANCEL in my job so that it can
gracefully finish all operations and more importantly save my checkpoints
in BQ.

Thanks and Regards
Mohil




On Sun, Aug 16, 2020 at 5:14 PM Reza Rokni <re...@google.com> wrote:

> Hi Mohil,
>
> At the moment the Drain feature is not available for looping timers (
> please see the note on 'Additional common features not yet part of the Beam
> Model' in the capability matrix):
>
>
> https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-what
>
> You will need to hit the 'click to expand' to see the detailed notes.
>
> +Kenn Knowles, looking at the matrix it's really hard to know there is
> useful info in the details. Maybe the ~ should also become visually
> clickable to make it more obvious. Also not sure if there is a way to
> provide a better error for the end user ?
>
> Reza
>
> On Mon, Aug 17, 2020 at 1:49 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi Reza,
>> I think looping timers so far has been working fine for my use case:
>> However I am unable to DRAIN job successfully because of @onTimer. I am
>> getting following error:
>>
>> Error message from worker: java.lang.IllegalStateException:
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@588b361b received
>> state cleanup timer for window
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
>> before the appropriate cleanup time 294247-01-10T04:00:54.776Z
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException:
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@54e513ce received
>> state cleanup timer for window
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
>> before the appropriate cleanup time 294247-01-10T04:00:54.776Z
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException:
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@6ed9a56b received
>> state cleanup timer for window
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
>> before the appropriate cleanup time 294247-01-10T04:00:54.776Z
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> java.lang.Thread.run(Thread.java:748)
>>
>> NOTE: I have upgrade beam java sdk to 2.23.0
>>
>> Thanks and Regards
>> Mohil
>>
>> On Thu, Jul 16, 2020 at 3:54 PM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi,
>>>
>>> So one solution would be to allow late data, but then within your State
>>> do a separate output of items that are late marked with a tag which
>>> indicates a 'correction'. In the State and Timer piece you will be able to
>>> know they are late as they will have a timestamp which is older than the
>>> current watermark ( note you will need to store the watermark, by ensuring
>>> that you store the 'last timer to fire timestamp' in state). It adds a lot
>>> more complexity...
>>>
>>> Cheers
>>> Reza
>>>
>>> On Thu, Jul 16, 2020 at 8:24 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hello Reza,
>>>>
>>>> I went through a looping timer blog followed by Berlin beam summit
>>>> video. "Gap Filling- Hold and propagation" : That's exactly what I was
>>>> looking for.  Thanks a lot, that was really helpful.
>>>> You mentioned that there are few mitigation techniques for solving late
>>>> data use cases. Can you suggest any ?
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Mon, Jul 13, 2020 at 11:19 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hello Reza,
>>>>> Thanks a lot for this information. I really appreciate it. Will
>>>>> definitely go through it and see if I can come up with something for my use
>>>>> case. Will keep you posted.
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil
>>>>>
>>>>> On Sun, Jul 12, 2020 at 5:42 PM Reza Rokni <re...@google.com> wrote:
>>>>>
>>>>>> Hi Mohil,
>>>>>>
>>>>>> You may find this video from the Berlin Beam summit useful:
>>>>>>
>>>>>> https://www.youtube.com/watch?v=Y_HoNNU6b3I
>>>>>>
>>>>>> There is a piece about hold and propagate of values about 9 mins in,
>>>>>> but you may also be interested in the looping timers section before that as
>>>>>> well.
>>>>>>
>>>>>> Cheers
>>>>>> Reza
>>>>>>
>>>>>> On Sat, Jul 11, 2020 at 4:28 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>>
>>>>>>> I see..
>>>>>>> Thanks a lot Luke for explaining.
>>>>>>> Let me think through this. Will get back to you if needed.
>>>>>>>
>>>>>>> Thanks and regards
>>>>>>> Mohil
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 12:07 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> The solution only works if you can have an upstream DoFn compute
>>>>>>>> the data that you need to pass forward to a fixed number of future windows,
>>>>>>>> so the graph would look something like:
>>>>>>>> PCollection(Data) -> ParDo(ComputeSideInputData) ->
>>>>>>>> PCollection(SideInputData) -----\
>>>>>>>>
>>>>>>>> \-----------------------------------------------------> ParDo(StatefulDoFn)
>>>>>>>>
>>>>>>>> There are generally two ways to do this,
>>>>>>>> 1) (easier) have ComputeSideInputData produce what you want to
>>>>>>>> carry forward within the next window, the side input lookup will perform a
>>>>>>>> join using the same window.
>>>>>>>> 2) (harder) create your own WindowFn and make sure that the side
>>>>>>>> input mapping[1] looks at the past window.
>>>>>>>>
>>>>>>>> This only allows you to pass forward information for up to a fixed
>>>>>>>> number of windows forward so you can't have data from window A goto B and C
>>>>>>>> on infinitely like a loop. This works if what your computing only cares
>>>>>>>> about so much data in the past and anything beyond that becomes effectively
>>>>>>>> noise (so recent data has a much higher weight then old data). The
>>>>>>>> alternative is to use the global window to store all the aggregated results
>>>>>>>> so far but side inputs may produce stale data when the side input was
>>>>>>>> produced using multiple firings. Using the global window is best if the
>>>>>>>> latest result isn't as important as having the majority of the data.
>>>>>>>>
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://github.com/apache/beam/blob/6e77bca71f2e875f829d42be11a8d064badf8a86/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L48
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 10:53 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Luke,
>>>>>>>>> Like always thanks for your prompt response.
>>>>>>>>>
>>>>>>>>> Can you please elaborate on "Or if the data you need to pass
>>>>>>>>> forward can be done as an earlier computation and then passed in as a side
>>>>>>>>> input using a window mapping fn that looks up the past window". I didn't
>>>>>>>>> quite understand this.
>>>>>>>>>
>>>>>>>>> Thanks again
>>>>>>>>> Mohil
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 10:43 AM Luke Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Typically no since all state is key and window partitioned. You
>>>>>>>>>> can window into a larger window, perform your stateful computation and then
>>>>>>>>>> re-window into the shorter one after it. Or if the data you need to pass
>>>>>>>>>> forward can be done as an earlier computation and then passed in as a side
>>>>>>>>>> input using a window mapping fn that looks up the past window.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 10:30 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> I am using beam on dataflow with java sdk 2.19.0.
>>>>>>>>>>> I have a use case where I need to collect some messages in a
>>>>>>>>>>> short window of few seconds to 1 minute, update state (stateful
>>>>>>>>>>> processing of beam) and carry forward this state information to next window
>>>>>>>>>>> and use this to initialize state of next window i.e something like
>>>>>>>>>>> following:
>>>>>>>>>>>
>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>
>>>>>>>>>>> Since windows are very short, I don't want to write to some
>>>>>>>>>>> external DB on window expiry and read from DB in next window. Is it
>>>>>>>>>>> possible to carry forward state information from one window to next ? Or
>>>>>>>>>>> any other suggestion to achieve this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks and regards
>>>>>>>>>>> Mohil
>>>>>>>>>>>
>>>>>>>>>>

Re: Carry forward state information from one window to next

Posted by Reza Rokni <re...@google.com>.
Hi Mohil,

At the moment the Drain feature is not available for looping timers (
please see the note on 'Additional common features not yet part of the Beam
Model' in the capability matrix):

https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-what

You will need to hit the 'click to expand' to see the detailed notes.

+Kenn Knowles, looking at the matrix it's really hard to know there is
useful info in the details. Maybe the ~ should also become visually
clickable to make it more obvious. Also not sure if there is a way to
provide a better error for the end user ?

Reza

On Mon, Aug 17, 2020 at 1:49 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Reza,
> I think looping timers so far has been working fine for my use case:
> However I am unable to DRAIN job successfully because of @onTimer. I am
> getting following error:
>
> Error message from worker: java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@588b361b received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
> before the appropriate cleanup time 294247-01-10T04:00:54.776Z
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@54e513ce received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
> before the appropriate cleanup time 294247-01-10T04:00:54.776Z
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@6ed9a56b received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@630b25d that is
> before the appropriate cleanup time 294247-01-10T04:00:54.776Z
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:392)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:455)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:478)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
>
> NOTE: I have upgrade beam java sdk to 2.23.0
>
> Thanks and Regards
> Mohil
>
> On Thu, Jul 16, 2020 at 3:54 PM Reza Rokni <re...@google.com> wrote:
>
>> Hi,
>>
>> So one solution would be to allow late data, but then within your State
>> do a separate output of items that are late marked with a tag which
>> indicates a 'correction'. In the State and Timer piece you will be able to
>> know they are late as they will have a timestamp which is older than the
>> current watermark ( note you will need to store the watermark, by ensuring
>> that you store the 'last timer to fire timestamp' in state). It adds a lot
>> more complexity...
>>
>> Cheers
>> Reza
>>
>> On Thu, Jul 16, 2020 at 8:24 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hello Reza,
>>>
>>> I went through a looping timer blog followed by Berlin beam summit
>>> video. "Gap Filling- Hold and propagation" : That's exactly what I was
>>> looking for.  Thanks a lot, that was really helpful.
>>> You mentioned that there are few mitigation techniques for solving late
>>> data use cases. Can you suggest any ?
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>> On Mon, Jul 13, 2020 at 11:19 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hello Reza,
>>>> Thanks a lot for this information. I really appreciate it. Will
>>>> definitely go through it and see if I can come up with something for my use
>>>> case. Will keep you posted.
>>>>
>>>> Thanks and Regards
>>>> Mohil
>>>>
>>>> On Sun, Jul 12, 2020 at 5:42 PM Reza Rokni <re...@google.com> wrote:
>>>>
>>>>> Hi Mohil,
>>>>>
>>>>> You may find this video from the Berlin Beam summit useful:
>>>>>
>>>>> https://www.youtube.com/watch?v=Y_HoNNU6b3I
>>>>>
>>>>> There is a piece about hold and propagate of values about 9 mins in,
>>>>> but you may also be interested in the looping timers section before that as
>>>>> well.
>>>>>
>>>>> Cheers
>>>>> Reza
>>>>>
>>>>> On Sat, Jul 11, 2020 at 4:28 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> I see..
>>>>>> Thanks a lot Luke for explaining.
>>>>>> Let me think through this. Will get back to you if needed.
>>>>>>
>>>>>> Thanks and regards
>>>>>> Mohil
>>>>>>
>>>>>> On Fri, Jul 10, 2020 at 12:07 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> The solution only works if you can have an upstream DoFn compute the
>>>>>>> data that you need to pass forward to a fixed number of future windows, so
>>>>>>> the graph would look something like:
>>>>>>> PCollection(Data) -> ParDo(ComputeSideInputData) ->
>>>>>>> PCollection(SideInputData) -----\
>>>>>>>
>>>>>>> \-----------------------------------------------------> ParDo(StatefulDoFn)
>>>>>>>
>>>>>>> There are generally two ways to do this,
>>>>>>> 1) (easier) have ComputeSideInputData produce what you want to carry
>>>>>>> forward within the next window, the side input lookup will perform a join
>>>>>>> using the same window.
>>>>>>> 2) (harder) create your own WindowFn and make sure that the side
>>>>>>> input mapping[1] looks at the past window.
>>>>>>>
>>>>>>> This only allows you to pass forward information for up to a fixed
>>>>>>> number of windows forward so you can't have data from window A goto B and C
>>>>>>> on infinitely like a loop. This works if what your computing only cares
>>>>>>> about so much data in the past and anything beyond that becomes effectively
>>>>>>> noise (so recent data has a much higher weight then old data). The
>>>>>>> alternative is to use the global window to store all the aggregated results
>>>>>>> so far but side inputs may produce stale data when the side input was
>>>>>>> produced using multiple firings. Using the global window is best if the
>>>>>>> latest result isn't as important as having the majority of the data.
>>>>>>>
>>>>>>>
>>>>>>> 1:
>>>>>>> https://github.com/apache/beam/blob/6e77bca71f2e875f829d42be11a8d064badf8a86/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java#L48
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 10:53 AM Mohil Khare <mo...@prosimo.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Luke,
>>>>>>>> Like always thanks for your prompt response.
>>>>>>>>
>>>>>>>> Can you please elaborate on "Or if the data you need to pass
>>>>>>>> forward can be done as an earlier computation and then passed in as a side
>>>>>>>> input using a window mapping fn that looks up the past window". I didn't
>>>>>>>> quite understand this.
>>>>>>>>
>>>>>>>> Thanks again
>>>>>>>> Mohil
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 10:43 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Typically no since all state is key and window partitioned. You
>>>>>>>>> can window into a larger window, perform your stateful computation and then
>>>>>>>>> re-window into the shorter one after it. Or if the data you need to pass
>>>>>>>>> forward can be done as an earlier computation and then passed in as a side
>>>>>>>>> input using a window mapping fn that looks up the past window.
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 10:30 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I am using beam on dataflow with java sdk 2.19.0.
>>>>>>>>>> I have a use case where I need to collect some messages in a
>>>>>>>>>> short window of few seconds to 1 minute, update state (stateful
>>>>>>>>>> processing of beam) and carry forward this state information to next window
>>>>>>>>>> and use this to initialize state of next window i.e something like
>>>>>>>>>> following:
>>>>>>>>>>
>>>>>>>>>> [image: image.png]
>>>>>>>>>>
>>>>>>>>>> Since windows are very short, I don't want to write to some
>>>>>>>>>> external DB on window expiry and read from DB in next window. Is it
>>>>>>>>>> possible to carry forward state information from one window to next ? Or
>>>>>>>>>> any other suggestion to achieve this?
>>>>>>>>>>
>>>>>>>>>> Thanks and regards
>>>>>>>>>> Mohil
>>>>>>>>>>
>>>>>>>>>