You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tomas Gareau <to...@tenera.care> on 2021/03/26 22:34:35 UTC

[Question] Aggregating messages by event ID and dropping late data

I'm trying to build a Beam pipeline to aggregate events from an 
unbounded source.

These events work as follows:

 1. Some event with a given ID occurs
      * This ID may be re-used but will _never_ be re-used within *Y*
        seconds
 2. Any number of listeners report that event -- the *event* times will
    all be within a couple ms of each other
 3. These events need to be aggregated by `id` to form some output message
      * The order of the messages is not important, just the fact that
        they are included

For example, given an input message in CSV format of `id,message`, I 
would expect the inputs below:

"1,this "
"1,is "
"1,event "
"2,two "
"1,one "
"2,chiming "
"2,in "

to give the outputs:

"1,this is event one"
"2,two chiming in"


With network latency, messages will have different amounts of lag 
getting to Beam. I'm more concerned with latency than completeness -- 
it's fine if we miss a couple messages from listeners -- and the 
downstream processing is not currently equipped to re-process events if 
late data comes in, so I'm happy to completely discard late data.

What I'd like to express in Beam is as follows:

  * A message with event ID *1* arrives → start a new window, wait *X*
    seconds to see if another message with event ID *A* arrives
      o A message with event ID *1* arrives in _less than_ *X* seconds →
        add it to the window and wait another *X* seconds
      o A message with event ID *1* arrives in _more than_ *X* seconds
        but _less than_ *Y* seconds → discard it
      o A message with event ID *1* arrives in _more than_ *Y* seconds →
        start a new window
  * If the *X* second timer for a window expires, trigger a pane with
    whatever events have happened to come in

At first, session windows seem like a great candidate: I can set the gap 
duration to *X* and events within *X* seconds of each other will be 
assigned to the same window -- great! However, I'm unable to drop events 
that are more than *X* but less than *Y* seconds apart -- Beam will 
instead consider this a new session.

How could I configure Beam to trigger *X* seconds after the most recent 
element and drop elements that arrive *X* < `arrival time` < *Y*?

Cheers,
Tomas

_

_Note: it feels like I'm fighting the framework here, which is often a 
sign that I could maybe approach it from a framework point-of-view -- if 
there are better ways to address this use-case that are more in line 
with Beam's philosophy I'd love to hear them too!


Re: [Question] Aggregating messages by event ID and dropping late data

Posted by Tomas Gareau <to...@tenera.care>.
Quick note for anyone else that comes across a similar use-case: looks 
like this is a prime candidate for Stateful DoFns:

https://beam.apache.org/documentation/programming-guide/#state-and-timers

On 2021/03/26 22:34:35, Tomas Gareau <t....@tenera.care> wrote:
 > I'm trying to build a Beam pipeline to aggregate events from an >
 > unbounded source.>
 >
 > These events work as follows:>
 >
 > 1. Some event with a given ID occurs>
 > * This ID may be re-used but will _never_ be re-used within *Y*>
 > seconds>
 > 2. Any number of listeners report that event -- the *event* times will>
 > all be within a couple ms of each other>
 > 3. These events need to be aggregated by `id` to form some output 
message>
 > * The order of the messages is not important, just the fact that>
 > they are included>
 >
 > For example, given an input message in CSV format of `id,message`, I >
 > would expect the inputs below:>
 >
 > "1,this ">
 > "1,is ">
 > "1,event ">
 > "2,two ">
 > "1,one ">
 > "2,chiming ">
 > "2,in ">
 >
 > to give the outputs:>
 >
 > "1,this is event one">
 > "2,two chiming in">
 >
 >
 > With network latency, messages will have different amounts of lag >
 > getting to Beam. I'm more concerned with latency than completeness -- >
 > it's fine if we miss a couple messages from listeners -- and the >
 > downstream processing is not currently equipped to re-process events 
if >
 > late data comes in, so I'm happy to completely discard late data.>
 >
 > What I'd like to express in Beam is as follows:>
 >
 > * A message with event ID *1* arrives → start a new window, wait *X*>
 > seconds to see if another message with event ID *A* arrives>
 > o A message with event ID *1* arrives in _less than_ *X* seconds →>
 > add it to the window and wait another *X* seconds>
 > o A message with event ID *1* arrives in _more than_ *X* seconds>
 > but _less than_ *Y* seconds → discard it>
 > o A message with event ID *1* arrives in _more than_ *Y* seconds →>
 > start a new window>
 > * If the *X* second timer for a window expires, trigger a pane with>
 > whatever events have happened to come in>
 >
 > At first, session windows seem like a great candidate: I can set the 
gap >
 > duration to *X* and events within *X* seconds of each other will be >
 > assigned to the same window -- great! However, I'm unable to drop 
events >
 > that are more than *X* but less than *Y* seconds apart -- Beam will >
 > instead consider this a new session.>
 >
 > How could I configure Beam to trigger *X* seconds after the most 
recent >
 > element and drop elements that arrive *X* < `arrival time` < *Y*?>
 >
 > Cheers,>
 > Tomas>
 >
 > _>
 >
 > _Note: it feels like I'm fighting the framework here, which is often a >
 > sign that I could maybe approach it from a framework point-of-view -- 
if >
 > there are better ways to address this use-case that are more in line >
 > with Beam's philosophy I'd love to hear them too!>
 >
 >