You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Desmond F <de...@gmail.com> on 2021/11/10 14:21:48 UTC

How to implement a predictive use-case (Python)

Hi all,

We have many clients connected via websockets through api gateway on
AWS, these clients submit events of various types periodically, each
event contains a sessionID (generated by the client), the session
logically ends when there's no activity for a specified duration of
time. We have a sequence model (RNN) written in PyTorch that needs to
send predictions back to the clients for each event processed. The
input to the model contains the raw events sent in the order they were
generated.

I believe the pipeline looks something like this: Source -> Group By
Session ID -> Accumulate Events and Make Predictions

1. In your opinion, does this use-case fit naturally with Apache Beam
programming model?
2. We'd like the session to expire after a predefined duration
(processing time) of no activity, how can this be achieved?
3. Before session expiry we'd like to produce the session details to
another task that batchify sessions and writes them to S3
4. The "Accumulate Events and Make Predictions" should be a stateful
function that builds the session and for each event it should call the
model with the historical events, what is the best way to achieve
that?
5. How to ensure that events for each key arrive in the same order
they were produced? Is it already guaranteed?
6. The model needs to react to events in real-time (i.e, in less than
500ms), do you believe this is achievable?

Thanks in advance,
Desmond

Re: How to implement a predictive use-case (Python)

Posted by Desmond F <de...@gmail.com>.
Also, is it possible to react in real-time to each events after
groupbykey? In the use-case above, we need to react to each event but
still aggregate the events and sessionize them. I went over windowing
/ triggers / groupby, and thus far I couldn't figure out a way to do
it...

On Wed, Nov 10, 2021 at 4:21 PM Desmond F <de...@gmail.com> wrote:
>
> Hi all,
>
> We have many clients connected via websockets through api gateway on
> AWS, these clients submit events of various types periodically, each
> event contains a sessionID (generated by the client), the session
> logically ends when there's no activity for a specified duration of
> time. We have a sequence model (RNN) written in PyTorch that needs to
> send predictions back to the clients for each event processed. The
> input to the model contains the raw events sent in the order they were
> generated.
>
> I believe the pipeline looks something like this: Source -> Group By
> Session ID -> Accumulate Events and Make Predictions
>
> 1. In your opinion, does this use-case fit naturally with Apache Beam
> programming model?
> 2. We'd like the session to expire after a predefined duration
> (processing time) of no activity, how can this be achieved?
> 3. Before session expiry we'd like to produce the session details to
> another task that batchify sessions and writes them to S3
> 4. The "Accumulate Events and Make Predictions" should be a stateful
> function that builds the session and for each event it should call the
> model with the historical events, what is the best way to achieve
> that?
> 5. How to ensure that events for each key arrive in the same order
> they were produced? Is it already guaranteed?
> 6. The model needs to react to events in real-time (i.e, in less than
> 500ms), do you believe this is achievable?
>
> Thanks in advance,
> Desmond

Re: How to implement a predictive use-case (Python)

Posted by Luke Cwik <lc...@google.com>.
All runners effectively track the lower bound watermark across the upstream
sources so delays in some data can cause a delay in the watermark being
reported causing the buffered data to be delayed. There is currently no
notion of a per key watermark which prevents usage of event time. It is
possible to build a stateful DoFn that tracks effectively a per key
watermark using processing time based timers and doesn't rely on Beam's
watermark advancement and event time.

On Wed, Nov 10, 2021 at 11:32 PM Desmond F <de...@gmail.com> wrote:

> Hi Luke, thanks for your reply.
>
> I had a confusion regarding the semantics of GroupByKey, Windowing and
> ParDo, I initially thought ParDo requires a GroupByKey in the pipeline
> beforehand, and I believe that if you use GroupByKey then all the
> events are buffered before they are passed to the ParDo.
>
> Another issue that we are facing:
>
> The events' timestamps are generated by many of our clients, which do
> not have clocks synchronised (personal devices). Is there a way to use
> event time on a per key basis, and use it to indicate the session
> expiry somehow, for us the session expires when there are no
> additional events for a specified period.
>
> Thanks!
> Des.
>
> On Wed, Nov 10, 2021 at 7:39 PM Luke Cwik <lc...@google.com> wrote:
> >
> > To answer the follow-up, you can use a stateful DoFn which relies on the
> runner to ensure that records are routed to the appropriate stateful DoFn
> instance or you can control when the output is produced using the trigger
> on the GroupByKey.
> >
> > Take a look at:
> > https://beam.apache.org/blog/timely-processing/
> >
> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam/45911664
> >
> > and some of the examples:
> >
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
> for an example of using session windows.
> >
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
> for an example for stateful processing.
> >
> > There are also more learning resources:
> > https://beam.apache.org/documentation/resources/learning-resources/
> >
> >
> > On Wed, Nov 10, 2021 at 9:30 AM Luke Cwik <lc...@google.com> wrote:
> >>
> >>
> >>
> >> On Wed, Nov 10, 2021 at 6:22 AM Desmond F <de...@gmail.com> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> We have many clients connected via websockets through api gateway on
> >>> AWS, these clients submit events of various types periodically, each
> >>> event contains a sessionID (generated by the client), the session
> >>> logically ends when there's no activity for a specified duration of
> >>> time. We have a sequence model (RNN) written in PyTorch that needs to
> >>> send predictions back to the clients for each event processed. The
> >>> input to the model contains the raw events sent in the order they were
> >>> generated.
> >>>
> >>> I believe the pipeline looks something like this: Source -> Group By
> >>> Session ID -> Accumulate Events and Make Predictions
> >>>
> >>> 1. In your opinion, does this use-case fit naturally with Apache Beam
> >>> programming model?
> >>
> >> Yes.
> >>
> >>> 2. We'd like the session to expire after a predefined duration
> >>> (processing time) of no activity, how can this be achieved?
> >>
> >> You can do this with a stateful DoFn storing all the accumulated data
> and use a processing time trigger to expire the results. Have you
> considered using event time instead since session windows with a gap size
> seem to be a good fit and will handle cases where the source producing
> events gets delayed or internal processing within the pipeline gets delayed.
> >>
> >>> 3. Before session expiry we'd like to produce the session details to
> >>> another task that batchify sessions and writes them to S3
> >>
> >> Yes, the processing time trigger can emit the results when it fires. If
> using event time then the GroupByKey trigger can be used to control when
> results are produced. For example if you use the default trigger which
> produces results when the window is done then the downstream transforms
> would get to see all the results for each session as the session expires.
> >>
> >>> 4. The "Accumulate Events and Make Predictions" should be a stateful
> >>> function that builds the session and for each event it should call the
> >>> model with the historical events, what is the best way to achieve
> >>> that?
> >>
> >> With the stateful DoFn you can do this as you see the events come in.
> Relying on session windows with event times may not fit this approach as
> well.
> >>
> >>> 5. How to ensure that events for each key arrive in the same order
> >>> they were produced? Is it already guaranteed?
> >>
> >> Normally this isn't guaranteed but some runners do provide this
> guarantee automatically. I believe there is a page on the blog that lists
> different runners and their time sorting characteristics. You could also
> take a look at runners that support @RequiresTimeSortedInput.
> >>
> >>>
> >>> 6. The model needs to react to events in real-time (i.e, in less than
> >>> 500ms), do you believe this is achievable?
> >>
> >> This is dependent on the source that is publishing the data and the
> runner as these will impact the latency. You can achieve low latencies
> using the direct runner (a local single process runner) assuming that the
> source is configured to checkpoint often so that the ingested data can be
> processed downstream.  Other distributed runners are typically on the order
> of low seconds.
> >>
> >>>
> >>>
> >>> Thanks in advance,
> >>> Desmond
>

Re: How to implement a predictive use-case (Python)

Posted by Desmond F <de...@gmail.com>.
Hi Luke, thanks for your reply.

I had a confusion regarding the semantics of GroupByKey, Windowing and
ParDo, I initially thought ParDo requires a GroupByKey in the pipeline
beforehand, and I believe that if you use GroupByKey then all the
events are buffered before they are passed to the ParDo.

Another issue that we are facing:

The events' timestamps are generated by many of our clients, which do
not have clocks synchronised (personal devices). Is there a way to use
event time on a per key basis, and use it to indicate the session
expiry somehow, for us the session expires when there are no
additional events for a specified period.

Thanks!
Des.

On Wed, Nov 10, 2021 at 7:39 PM Luke Cwik <lc...@google.com> wrote:
>
> To answer the follow-up, you can use a stateful DoFn which relies on the runner to ensure that records are routed to the appropriate stateful DoFn instance or you can control when the output is produced using the trigger on the GroupByKey.
>
> Take a look at:
> https://beam.apache.org/blog/timely-processing/
> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam/45911664
>
> and some of the examples:
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java for an example of using session windows.
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java for an example for stateful processing.
>
> There are also more learning resources:
> https://beam.apache.org/documentation/resources/learning-resources/
>
>
> On Wed, Nov 10, 2021 at 9:30 AM Luke Cwik <lc...@google.com> wrote:
>>
>>
>>
>> On Wed, Nov 10, 2021 at 6:22 AM Desmond F <de...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>> We have many clients connected via websockets through api gateway on
>>> AWS, these clients submit events of various types periodically, each
>>> event contains a sessionID (generated by the client), the session
>>> logically ends when there's no activity for a specified duration of
>>> time. We have a sequence model (RNN) written in PyTorch that needs to
>>> send predictions back to the clients for each event processed. The
>>> input to the model contains the raw events sent in the order they were
>>> generated.
>>>
>>> I believe the pipeline looks something like this: Source -> Group By
>>> Session ID -> Accumulate Events and Make Predictions
>>>
>>> 1. In your opinion, does this use-case fit naturally with Apache Beam
>>> programming model?
>>
>> Yes.
>>
>>> 2. We'd like the session to expire after a predefined duration
>>> (processing time) of no activity, how can this be achieved?
>>
>> You can do this with a stateful DoFn storing all the accumulated data and use a processing time trigger to expire the results. Have you considered using event time instead since session windows with a gap size seem to be a good fit and will handle cases where the source producing events gets delayed or internal processing within the pipeline gets delayed.
>>
>>> 3. Before session expiry we'd like to produce the session details to
>>> another task that batchify sessions and writes them to S3
>>
>> Yes, the processing time trigger can emit the results when it fires. If using event time then the GroupByKey trigger can be used to control when results are produced. For example if you use the default trigger which produces results when the window is done then the downstream transforms would get to see all the results for each session as the session expires.
>>
>>> 4. The "Accumulate Events and Make Predictions" should be a stateful
>>> function that builds the session and for each event it should call the
>>> model with the historical events, what is the best way to achieve
>>> that?
>>
>> With the stateful DoFn you can do this as you see the events come in. Relying on session windows with event times may not fit this approach as well.
>>
>>> 5. How to ensure that events for each key arrive in the same order
>>> they were produced? Is it already guaranteed?
>>
>> Normally this isn't guaranteed but some runners do provide this guarantee automatically. I believe there is a page on the blog that lists different runners and their time sorting characteristics. You could also take a look at runners that support @RequiresTimeSortedInput.
>>
>>>
>>> 6. The model needs to react to events in real-time (i.e, in less than
>>> 500ms), do you believe this is achievable?
>>
>> This is dependent on the source that is publishing the data and the runner as these will impact the latency. You can achieve low latencies using the direct runner (a local single process runner) assuming that the source is configured to checkpoint often so that the ingested data can be processed downstream.  Other distributed runners are typically on the order of low seconds.
>>
>>>
>>>
>>> Thanks in advance,
>>> Desmond

Re: How to implement a predictive use-case (Python)

Posted by Luke Cwik <lc...@google.com>.
To answer the follow-up, you can use a stateful DoFn which relies on the
runner to ensure that records are routed to the appropriate stateful DoFn
instance or you can control when the output is produced using the trigger
on the GroupByKey.

Take a look at:
https://beam.apache.org/blog/timely-processing/
https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam/45911664

and some of the examples:
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
for an example of using session windows.
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
for an example for stateful processing.

There are also more learning resources:
https://beam.apache.org/documentation/resources/learning-resources/


On Wed, Nov 10, 2021 at 9:30 AM Luke Cwik <lc...@google.com> wrote:

>
>
> On Wed, Nov 10, 2021 at 6:22 AM Desmond F <de...@gmail.com> wrote:
>
>> Hi all,
>>
>> We have many clients connected via websockets through api gateway on
>> AWS, these clients submit events of various types periodically, each
>> event contains a sessionID (generated by the client), the session
>> logically ends when there's no activity for a specified duration of
>> time. We have a sequence model (RNN) written in PyTorch that needs to
>> send predictions back to the clients for each event processed. The
>> input to the model contains the raw events sent in the order they were
>> generated.
>>
>> I believe the pipeline looks something like this: Source -> Group By
>> Session ID -> Accumulate Events and Make Predictions
>>
>> 1. In your opinion, does this use-case fit naturally with Apache Beam
>> programming model?
>>
> Yes.
>
> 2. We'd like the session to expire after a predefined duration
>> (processing time) of no activity, how can this be achieved?
>>
> You can do this with a stateful DoFn storing all the accumulated data and
> use a processing time trigger to expire the results. Have you considered
> using event time instead since session windows with a gap size seem to be a
> good fit and will handle cases where the source producing events gets
> delayed or internal processing within the pipeline gets delayed.
>
> 3. Before session expiry we'd like to produce the session details to
>> another task that batchify sessions and writes them to S3
>
> Yes, the processing time trigger can emit the results when it fires. If
> using event time then the GroupByKey trigger can be used to control when
> results are produced. For example if you use the default trigger which
> produces results when the window is done then the downstream transforms
> would get to see all the results for each session as the session expires.
>
> 4. The "Accumulate Events and Make Predictions" should be a stateful
>> function that builds the session and for each event it should call the
>> model with the historical events, what is the best way to achieve
>> that?
>>
> With the stateful DoFn you can do this as you see the events come in.
> Relying on session windows with event times may not fit this approach as
> well.
>
> 5. How to ensure that events for each key arrive in the same order
>> they were produced? Is it already guaranteed?
>>
> Normally this isn't guaranteed but some runners do provide this guarantee
> automatically. I believe there is a page on the blog that lists different
> runners and their time sorting characteristics. You could also take a look
> at runners that support @RequiresTimeSortedInput.
>
>
>> 6. The model needs to react to events in real-time (i.e, in less than
>> 500ms), do you believe this is achievable?
>>
> This is dependent on the source that is publishing the data and the runner
> as these will impact the latency. You can achieve low latencies using the
> direct runner (a local single process runner) assuming that the source is
> configured to checkpoint often so that the ingested data can be processed
> downstream.  Other distributed runners are typically on the order of low
> seconds.
>
>
>>
>> Thanks in advance,
>> Desmond
>>
>

Re: How to implement a predictive use-case (Python)

Posted by Luke Cwik <lc...@google.com>.
On Wed, Nov 10, 2021 at 6:22 AM Desmond F <de...@gmail.com> wrote:

> Hi all,
>
> We have many clients connected via websockets through api gateway on
> AWS, these clients submit events of various types periodically, each
> event contains a sessionID (generated by the client), the session
> logically ends when there's no activity for a specified duration of
> time. We have a sequence model (RNN) written in PyTorch that needs to
> send predictions back to the clients for each event processed. The
> input to the model contains the raw events sent in the order they were
> generated.
>
> I believe the pipeline looks something like this: Source -> Group By
> Session ID -> Accumulate Events and Make Predictions
>
> 1. In your opinion, does this use-case fit naturally with Apache Beam
> programming model?
>
Yes.

2. We'd like the session to expire after a predefined duration
> (processing time) of no activity, how can this be achieved?
>
You can do this with a stateful DoFn storing all the accumulated data and
use a processing time trigger to expire the results. Have you considered
using event time instead since session windows with a gap size seem to be a
good fit and will handle cases where the source producing events gets
delayed or internal processing within the pipeline gets delayed.

3. Before session expiry we'd like to produce the session details to
> another task that batchify sessions and writes them to S3

Yes, the processing time trigger can emit the results when it fires. If
using event time then the GroupByKey trigger can be used to control when
results are produced. For example if you use the default trigger which
produces results when the window is done then the downstream transforms
would get to see all the results for each session as the session expires.

4. The "Accumulate Events and Make Predictions" should be a stateful
> function that builds the session and for each event it should call the
> model with the historical events, what is the best way to achieve
> that?
>
With the stateful DoFn you can do this as you see the events come in.
Relying on session windows with event times may not fit this approach as
well.

5. How to ensure that events for each key arrive in the same order
> they were produced? Is it already guaranteed?
>
Normally this isn't guaranteed but some runners do provide this guarantee
automatically. I believe there is a page on the blog that lists different
runners and their time sorting characteristics. You could also take a look
at runners that support @RequiresTimeSortedInput.


> 6. The model needs to react to events in real-time (i.e, in less than
> 500ms), do you believe this is achievable?
>
This is dependent on the source that is publishing the data and the runner
as these will impact the latency. You can achieve low latencies using the
direct runner (a local single process runner) assuming that the source is
configured to checkpoint often so that the ingested data can be processed
downstream.  Other distributed runners are typically on the order of low
seconds.


>
> Thanks in advance,
> Desmond
>