You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tomas Gareau <to...@tenera.care> on 2021/03/26 22:34:35 UTC
[Question] Aggregating messages by event ID and dropping late data
I'm trying to build a Beam pipeline to aggregate events from an
unbounded source.
These events work as follows:
1. Some event with a given ID occurs
* This ID may be re-used but will _never_ be re-used within *Y*
seconds
2. Any number of listeners report that event -- the *event* times will
all be within a couple ms of each other
3. These events need to be aggregated by `id` to form some output message
* The order of the messages is not important, just the fact that
they are included
For example, given an input message in CSV format of `id,message`, I
would expect the inputs below:
"1,this "
"1,is "
"1,event "
"2,two "
"1,one "
"2,chiming "
"2,in "
to give the outputs:
"1,this is event one"
"2,two chiming in"
With network latency, messages will have different amounts of lag
getting to Beam. I'm more concerned with latency than completeness --
it's fine if we miss a couple messages from listeners -- and the
downstream processing is not currently equipped to re-process events if
late data comes in, so I'm happy to completely discard late data.
What I'd like to express in Beam is as follows:
* A message with event ID *1* arrives → start a new window, wait *X*
seconds to see if another message with event ID *A* arrives
o A message with event ID *1* arrives in _less than_ *X* seconds →
add it to the window and wait another *X* seconds
o A message with event ID *1* arrives in _more than_ *X* seconds
but _less than_ *Y* seconds → discard it
o A message with event ID *1* arrives in _more than_ *Y* seconds →
start a new window
* If the *X* second timer for a window expires, trigger a pane with
whatever events have happened to come in
At first, session windows seem like a great candidate: I can set the gap
duration to *X* and events within *X* seconds of each other will be
assigned to the same window -- great! However, I'm unable to drop events
that are more than *X* but less than *Y* seconds apart -- Beam will
instead consider this a new session.
How could I configure Beam to trigger *X* seconds after the most recent
element and drop elements that arrive *X* < `arrival time` < *Y*?
Cheers,
Tomas
_
_Note: it feels like I'm fighting the framework here, which is often a
sign that I could maybe approach it from a framework point-of-view -- if
there are better ways to address this use-case that are more in line
with Beam's philosophy I'd love to hear them too!
Re: [Question] Aggregating messages by event ID and dropping late
data
Posted by Tomas Gareau <to...@tenera.care>.
Quick note for anyone else that comes across a similar use-case: looks
like this is a prime candidate for Stateful DoFns:
https://beam.apache.org/documentation/programming-guide/#state-and-timers
On 2021/03/26 22:34:35, Tomas Gareau <t....@tenera.care> wrote:
> I'm trying to build a Beam pipeline to aggregate events from an >
> unbounded source.>
>
> These events work as follows:>
>
> 1. Some event with a given ID occurs>
> * This ID may be re-used but will _never_ be re-used within *Y*>
> seconds>
> 2. Any number of listeners report that event -- the *event* times will>
> all be within a couple ms of each other>
> 3. These events need to be aggregated by `id` to form some output
message>
> * The order of the messages is not important, just the fact that>
> they are included>
>
> For example, given an input message in CSV format of `id,message`, I >
> would expect the inputs below:>
>
> "1,this ">
> "1,is ">
> "1,event ">
> "2,two ">
> "1,one ">
> "2,chiming ">
> "2,in ">
>
> to give the outputs:>
>
> "1,this is event one">
> "2,two chiming in">
>
>
> With network latency, messages will have different amounts of lag >
> getting to Beam. I'm more concerned with latency than completeness -- >
> it's fine if we miss a couple messages from listeners -- and the >
> downstream processing is not currently equipped to re-process events
if >
> late data comes in, so I'm happy to completely discard late data.>
>
> What I'd like to express in Beam is as follows:>
>
> * A message with event ID *1* arrives → start a new window, wait *X*>
> seconds to see if another message with event ID *A* arrives>
> o A message with event ID *1* arrives in _less than_ *X* seconds →>
> add it to the window and wait another *X* seconds>
> o A message with event ID *1* arrives in _more than_ *X* seconds>
> but _less than_ *Y* seconds → discard it>
> o A message with event ID *1* arrives in _more than_ *Y* seconds →>
> start a new window>
> * If the *X* second timer for a window expires, trigger a pane with>
> whatever events have happened to come in>
>
> At first, session windows seem like a great candidate: I can set the
gap >
> duration to *X* and events within *X* seconds of each other will be >
> assigned to the same window -- great! However, I'm unable to drop
events >
> that are more than *X* but less than *Y* seconds apart -- Beam will >
> instead consider this a new session.>
>
> How could I configure Beam to trigger *X* seconds after the most
recent >
> element and drop elements that arrive *X* < `arrival time` < *Y*?>
>
> Cheers,>
> Tomas>
>
> _>
>
> _Note: it feels like I'm fighting the framework here, which is often a >
> sign that I could maybe approach it from a framework point-of-view --
if >
> there are better ways to address this use-case that are more in line >
> with Beam's philosophy I'd love to hear them too!>
>
>