You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Ifat Afek (Nokia)" <if...@nokia.com> on 2023/01/03 08:15:39 UTC

Join streams with different frequencies

Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some processing. Each event holds data for a specific non-unique ID (we keep getting updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that arrive in a variable frequency. Could be after a minute and then again after 5 hours.
We would like to join the current DataX and latest DataY events by ID (and process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and then use it as a side input for filtering the DataX events stream. The state should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event is fired only once and cannot be reused later on for filtering. On the other hand, if we are using accumulatingFiredPanes(), then a list of all DataY events that arrived is fired.

Are we missing something? what is the best practice for combining two streams, one with a variable frequency?

Thanks,
Ifat


Re: Join streams with different frequencies

Posted by "Ifat Afek (Nokia)" <if...@nokia.com>.
Hi jan,

Thanks, I will check the options you suggested.

Best Regards,
Ifat

From: Jan Lukavský <je...@seznam.cz>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, 4 January 2023 at 18:52
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Join streams with different frequencies


Hi,

the general pattern here would be to map both the PCollections to a common type, e.g. PCollection<KV<TupleTag, T>> and then flatten them into one PCollection, onto which you apply a stateful DoFn, see [1]. You would hold the DataY value of your ID in the state and match it against events coming from DataX stream. Under the assumption you do not need to make ensure that each DataX stream is matched on the *exactly preceding* DataY event (in event time), this works fine.

If you need to be sure that each DataX event is matched against the latest DataY (and most of the time it is likely you don't have this requirement), then you can:

 a) buffer DataX in a BagState and use timers to flush the state after some timeout, or

 b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports it), which will do the buffering for you and pass the elements into @ProcessElement method sorted by event timestamp

In both cases it is worth to realize how you want to handle late data (i.e. data that arrived after watermark, or after an element was already matched, but on a wrong element). The solution (b) simply drops the late element (which might not be what you want), or introduces latency defined by allowedLateness. Another option would be to implement retractions and process them downstream. I implemented something like that in [3].

Hope that helps,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[3] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java
On 1/4/23 16:28, Ifat Afek (Nokia) wrote:
Thanks Sören,

I already saw your stack overflow question while trying to find a solution 😊
I prefer a solution that does not involve an external cache like Redis, if possible.

Best Regards,
Ifat

From: Sören Henning <so...@email.uni-kiel.de>
Reply-To: "user@beam.apache.org"<ma...@beam.apache.org> <us...@beam.apache.org>
Date: Tuesday, 3 January 2023 at 14:56
To: "user@beam.apache.org"<ma...@beam.apache.org> <us...@beam.apache.org>
Subject: Re: Join streams with different frequencies


Hi,

while I cannot provide you with a definite answer to your question, maybe my Stack Overflow question is interesting for you: https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören
Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some processing. Each event holds data for a specific non-unique ID (we keep getting updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that arrive in a variable frequency. Could be after a minute and then again after 5 hours.
We would like to join the current DataX and latest DataY events by ID (and process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and then use it as a side input for filtering the DataX events stream. The state should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event is fired only once and cannot be reused later on for filtering. On the other hand, if we are using accumulatingFiredPanes(), then a list of all DataY events that arrived is fired.

Are we missing something? what is the best practice for combining two streams, one with a variable frequency?

Thanks,
Ifat


Re: Join streams with different frequencies

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

the general pattern here would be to map both the PCollections to a 
common type, e.g. PCollection<KV<TupleTag, T>> and then flatten them 
into one PCollection, onto which you apply a stateful DoFn, see [1]. You 
would hold the DataY value of your ID in the state and match it against 
events coming from DataX stream. Under the assumption you do not need to 
make ensure that each DataX stream is matched on the *exactly preceding* 
DataY event (in event time), this works fine.

If you need to be sure that each DataX event is matched against the 
latest DataY (and most of the time it is likely you don't have this 
requirement), then you can:

  a) buffer DataX in a BagState and use timers to flush the state after 
some timeout, or

  b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports 
it), which will do the buffering for you and pass the elements into 
@ProcessElement method sorted by event timestamp

In both cases it is worth to realize how you want to handle late data 
(i.e. data that arrived after watermark, or after an element was already 
matched, but on a wrong element). The solution (b) simply drops the late 
element (which might not be what you want), or introduces latency 
defined by allowedLateness. Another option would be to implement 
retractions and process them downstream. I implemented something like 
that in [3].

Hope that helps,

  Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] 
https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java

On 1/4/23 16:28, Ifat Afek (Nokia) wrote:
>
> Thanks Sören,
>
> I already saw your stack overflow question while trying to find a 
> solution 😊
>
> I prefer a solution that does not involve an external cache like 
> Redis, if possible.
>
> Best Regards,
>
> Ifat
>
> *From: *Sören Henning <so...@email.uni-kiel.de>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Tuesday, 3 January 2023 at 14:56
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: Join streams with different frequencies
>
> Hi,
>
> while I cannot provide you with a definite answer to your question, 
> maybe my Stack Overflow question is interesting for you: 
> https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i
>
> Best regards,
> Sören
>
> Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
>
>     Hi,
>
>     We are trying to implement the following use case:
>
>     We have a stream of DataX events that arrive every 5 minutes and
>     require some processing. Each event holds data for a specific
>     non-unique ID (we keep getting updated data for each ID). There
>     might be up to 1,000,000 IDs.
>
>     In addition, there is a stream of DataY events for some of these
>     IDs, that arrive in a variable frequency. Could be after a minute
>     and then again after 5 hours.
>
>     We would like to join the current DataX and latest DataY events by
>     ID (and process only IDs that have both DataX and DataY events).
>
>     We thought of holding a state of DataY events per ID in a global
>     window, and then use it as a side input for filtering the DataX
>     events stream. The state should hold the latest (by timestamp)
>     DataY event that arrived.
>
>     The problem is: if we are using discardingFiredPanes(), then each
>     DataY event is fired only once and cannot be reused later on for
>     filtering. On the other hand, if we are using
>     accumulatingFiredPanes(), then a list of all DataY events that
>     arrived is fired.
>
>     Are we missing something? what is the best practice for combining
>     two streams, one with a variable frequency?
>
>     Thanks,
>
>     Ifat
>

Re: Join streams with different frequencies

Posted by "Ifat Afek (Nokia)" <if...@nokia.com>.
Thanks Sören,

I already saw your stack overflow question while trying to find a solution 😊
I prefer a solution that does not involve an external cache like Redis, if possible.

Best Regards,
Ifat

From: Sören Henning <so...@email.uni-kiel.de>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Tuesday, 3 January 2023 at 14:56
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: Join streams with different frequencies


Hi,

while I cannot provide you with a definite answer to your question, maybe my Stack Overflow question is interesting for you: https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören
Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some processing. Each event holds data for a specific non-unique ID (we keep getting updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that arrive in a variable frequency. Could be after a minute and then again after 5 hours.
We would like to join the current DataX and latest DataY events by ID (and process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and then use it as a side input for filtering the DataX events stream. The state should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event is fired only once and cannot be reused later on for filtering. On the other hand, if we are using accumulatingFiredPanes(), then a list of all DataY events that arrived is fired.

Are we missing something? what is the best practice for combining two streams, one with a variable frequency?

Thanks,
Ifat


Re: Join streams with different frequencies

Posted by Sören Henning <so...@email.uni-kiel.de>.
Hi,

while I cannot provide you with a definite answer to your question, 
maybe my Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören

Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
>
> Hi,
>
> We are trying to implement the following use case:
>
> We have a stream of DataX events that arrive every 5 minutes and 
> require some processing. Each event holds data for a specific 
> non-unique ID (we keep getting updated data for each ID). There might 
> be up to 1,000,000 IDs.
>
> In addition, there is a stream of DataY events for some of these IDs, 
> that arrive in a variable frequency. Could be after a minute and then 
> again after 5 hours.
>
> We would like to join the current DataX and latest DataY events by ID 
> (and process only IDs that have both DataX and DataY events).
>
> We thought of holding a state of DataY events per ID in a global 
> window, and then use it as a side input for filtering the DataX events 
> stream. The state should hold the latest (by timestamp) DataY event 
> that arrived.
>
> The problem is: if we are using discardingFiredPanes(), then each 
> DataY event is fired only once and cannot be reused later on for 
> filtering. On the other hand, if we are using 
> accumulatingFiredPanes(), then a list of all DataY events that arrived 
> is fired.
>
> Are we missing something? what is the best practice for combining two 
> streams, one with a variable frequency?
>
> Thanks,
>
> Ifat
>