You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Johannes Frey <jf...@data-maniacs.ai> on 2021/09/10 13:06:48 UTC

[Question] Testing interaction of streaming main input and slowly updating side input

Hi everybody,

I'm currently having a hard time wrapping my head around streaming
data processing.

Szenario:
I have a main stream of data that is going to be processed (orders
entering the system) and at some point in the pipeline in one
processing step I need a side input to enrich the processed data.

This side input should update itself once a day and is created using
the example in the documentation
https://beam.apache.org/documentation/patterns/side-inputs/ section:
"Slowly updating global window side inputs"

To make sure that everything works as expected I would like to write a
Junit test to make sure the side input updates regularly and the new
arriving data indeed gets the updated values from the side input.

Here is the code for what I am trying to do: https://pastebin.com/8mtPKTcv

The result I'm getting is that:
- The SideInput is triggered
- Processing starts
- all elements get processed up to the point of the processing step
that actually needs the side input then it blocks
- sideInput gets triggered again
- processing still blocked

Could anyone please explain to me where I'm wrong. I already tried
several things like introducing fixed windows to the main-stream but
no luck so far... also I couldn't find much information using google.

I also printed the ctx.timestamp() of the processings here is how they look:
2021-09-10T13:03:10.021Z sideloaded elements: 1767
2021-09-10T13:03:07.072Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:07.073Z Filter called
2021-09-10T13:03:30.022Z sideloaded elements: 1767

Your help would be really appreciated.

Thanks and regards

Re: [Question] Testing interaction of streaming main input and slowly updating side input

Posted by Johannes Frey <jf...@data-maniacs.ai>.
Hi Siyu,

Thank you for your response. I‘m aware of that. The thing is… even if it
runs forever the trigger should still fire every period if I’m not mistaken.

So I still do not understand why the main-stream blocks the processing
after the filter step and the enrich step gets never executed.

I would think that the trigger for the side input should provide a value to
the enrich step so that it can do its thing but it seems to wait for the
side input for whatever reason.

Or do I understand something wrong?

Thank you very much and regards
Johannes

On Mon 13. Sep 2021 at 22:49, Siyu Lin <si...@unity3d.com> wrote:

> Hi Johannes,
>
> L20 in your code will make it run forever so it is better to put a finite
> number there, like
> `GenerateSequence.from(0).to(1).withRate(1, Duration.standardSeconds(2))`
>
> -siyu
>
> > On Sep 10, 2021, at 6:06 AM, Johannes Frey <jf...@data-maniacs.ai> wrote:
> >
> > Hi everybody,
> >
> > I'm currently having a hard time wrapping my head around streaming
> > data processing.
> >
> > Szenario:
> > I have a main stream of data that is going to be processed (orders
> > entering the system) and at some point in the pipeline in one
> > processing step I need a side input to enrich the processed data.
> >
> > This side input should update itself once a day and is created using
> > the example in the documentation
> > https://beam.apache.org/documentation/patterns/side-inputs/ section:
> > "Slowly updating global window side inputs"
> >
> > To make sure that everything works as expected I would like to write a
> > Junit test to make sure the side input updates regularly and the new
> > arriving data indeed gets the updated values from the side input.
> >
> > Here is the code for what I am trying to do:
> https://pastebin.com/8mtPKTcv
> >
> > The result I'm getting is that:
> > - The SideInput is triggered
> > - Processing starts
> > - all elements get processed up to the point of the processing step
> > that actually needs the side input then it blocks
> > - sideInput gets triggered again
> > - processing still blocked
> >
> > Could anyone please explain to me where I'm wrong. I already tried
> > several things like introducing fixed windows to the main-stream but
> > no luck so far... also I couldn't find much information using google.
> >
> > I also printed the ctx.timestamp() of the processings here is how they
> look:
> > 2021-09-10T13:03:10.021Z sideloaded elements: 1767
> > 2021-09-10T13:03:07.072Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:07.073Z Filter called
> > 2021-09-10T13:03:30.022Z sideloaded elements: 1767
> >
> > Your help would be really appreciated.
> >
> > Thanks and regards
>
>

Re: [Question] Testing interaction of streaming main input and slowly updating side input

Posted by Siyu Lin <si...@unity3d.com>.
Hi Johannes,

L20 in your code will make it run forever so it is better to put a finite number there, like 
`GenerateSequence.from(0).to(1).withRate(1, Duration.standardSeconds(2))`

-siyu

> On Sep 10, 2021, at 6:06 AM, Johannes Frey <jf...@data-maniacs.ai> wrote:
> 
> Hi everybody,
> 
> I'm currently having a hard time wrapping my head around streaming
> data processing.
> 
> Szenario:
> I have a main stream of data that is going to be processed (orders
> entering the system) and at some point in the pipeline in one
> processing step I need a side input to enrich the processed data.
> 
> This side input should update itself once a day and is created using
> the example in the documentation
> https://beam.apache.org/documentation/patterns/side-inputs/ section:
> "Slowly updating global window side inputs"
> 
> To make sure that everything works as expected I would like to write a
> Junit test to make sure the side input updates regularly and the new
> arriving data indeed gets the updated values from the side input.
> 
> Here is the code for what I am trying to do: https://pastebin.com/8mtPKTcv
> 
> The result I'm getting is that:
> - The SideInput is triggered
> - Processing starts
> - all elements get processed up to the point of the processing step
> that actually needs the side input then it blocks
> - sideInput gets triggered again
> - processing still blocked
> 
> Could anyone please explain to me where I'm wrong. I already tried
> several things like introducing fixed windows to the main-stream but
> no luck so far... also I couldn't find much information using google.
> 
> I also printed the ctx.timestamp() of the processings here is how they look:
> 2021-09-10T13:03:10.021Z sideloaded elements: 1767
> 2021-09-10T13:03:07.072Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:30.022Z sideloaded elements: 1767
> 
> Your help would be really appreciated.
> 
> Thanks and regards