You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Miguel Araújo <mi...@gmail.com> on 2021/04/30 09:37:20 UTC

Guaranteeing event order in a KeyedProcessFunction

Hi everyone,

I have a KeyedProcessFunction whose events I would like to process in
event-time order.
My initial idea was to use a Map keyed by timestamp and, when a new event
arrives, iterate over the Map to process events older than the current
watermark.

The issue is that I obviously can't use a MapState, as my stream is keyed,
so the map would be scoped to the current key.
Is using a "regular" (i.e., not checkpointed) Map an option, given that its
content will be recreated by the replay of the events on a restart? Is it
guaranteed that the watermark that triggered the processing of multiple
events (and their subsequent push downstream) is not sent downstream before
these events themselves?

Thanks,
Miguel

Re: Guaranteeing event order in a KeyedProcessFunction

Posted by Miguel Araújo <mi...@gmail.com>.
I've realized this is not such a big issue because it's also upper bounded
by the number of watermarks received, and it won't be one per event.

Miguel Araújo <mi...@gmail.com> escreveu no dia segunda,
10/05/2021 à(s) 09:39:

> Thanks Dawid, having a look at CepOperator was useful. I implemented
> something with one difference I feel might be important:
>
> I noticed that in the CepOperator the timer is being registered for
> currentWatermark+1, instead of using the event's timestamp. Is there a
> reason for this? I think this implies a quadratic number of triggers, on
> the number of keys with events that arrived after the current watermark.
> For instance, if you have 1000 events per second on different keys (and
> different timestamps), a watermark that is delayed 1 second will fire ~1
> million times. Is this a requirement to the NFA implementation? Would this
> not be a problem?
>
> Thanks, once again.
>
> Dawid Wysakowicz <dw...@apache.org> escreveu no dia segunda,
> 10/05/2021 à(s) 09:13:
>
>> Hey Miguel,
>>
>> I think you could take a look at the CepOperator which does pretty much
>> what you are describing.
>>
>> As for more direct answers for your questions. If you use
>> KeyedProcessFunction it is always scoped to a single Key. There is no way
>> to process events from other keys. If you want to have more control over
>> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
>> you could look into using Operator API. Bare in mind it is a semi-public
>> API. It is very low level and subject to change rather frequently. Another
>> thing to consider is that if you use PriorityQueue instead of e.g. MapState
>> for buffering and ordering events you are constrained by the available
>> memory. We used PriorityQueue in the past in the CepOperator but migrated
>> it to MapState.
>>
>> It is possible that events in downstream operators can become late. It
>> all depends on the timestamp of the events you emit from the "sorting"
>> operator. If you emit records with timestamps larger than the Watermark
>> that "triggered" its generation it can become late.
>>
>> Hope those tips could help you a bit.
>>
>> Best,
>>
>> Dawid
>> On 04/05/2021 14:49, Miguel Araújo wrote:
>>
>> Hi Timo,
>>
>> Thanks for your answer. I think I wasn't clear enough in my initial
>> message, so let me give more details.
>>
>> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
>> user-id) and then fed into a KeyedProcessFunction. I want to process all
>> events for a given user in order, before sending them downstream for
>> further processing in other operators. I don't want to hold events longer
>> than needed, hence using the watermark to signal which events can be
>> processed.
>> I don't think your suggestion of using a ListState would work, because we
>> would effectively have one list per user. That would imply (among other
>> things) that an event could only be processed when a new event for the same
>> user arrives, which would not only imply a (potentially) huge latency, but
>> also data leakage. Not to mention that the events being sent could easily
>> be considered late-events to the downstream operators.
>> The idea of keying by timestamp was an "evolution" of the ListState
>> suggestion, where events-to-be-later-processed would be kept sorted in the
>> map (which is what would be keyed by timestamp). We could iterate the map
>> to process the events, instead of fetching the full list and sorting it to
>> process the events in order. I don't think this solves any of the problems
>> mentioned above, so I think that mentioning it only raised confusion.
>>
>> Regarding global event-time order, that's not really what I'm after. I
>> only need event-time order per key, but I want to process the event as soon
>> as possible, constrained by knowing that it is "safe" to do so because no
>> event with a smaller timestamp for this key is yet to come.
>>
>> So, rephrasing my question as I'm not sure that part was clear in the
>> initial message, here is the idea:
>> - keeping one priority queue (ordered by timestamp) in each
>> KeyedProcessFunction instance. Therefore, each priority queue would store
>> events for multiple keys.
>> - when an event arrives, we push it to the queue and then process events
>> (updating state and sending them downstream) while their timestamp is lower
>> than the current watermark.
>>
>> The question is:
>> - is this fault tolerant? The priority queue is not state that is managed
>> by flink, but it should be recoverable on replay.
>> - is it possible that the events I'm sending downstream become
>> late-events for a different operator, for some reason? Will they always be
>> sent before the watermark of the event that originated the processElement()
>> call?
>> - I would effectively be processing multiple elements (from multiple
>> keys) in the same call to processElement(). Is there a way to access the
>> state of different keys?
>>
>> This doesn't feel like the right approach. Is there an operator more
>> suitable than a KeyedProcessFunction which would allow me to handle the
>> state for multiple keys in this task manager? Should I register a timer to
>> trigger on the event timestamp instead? I believe timers trigger on
>> watermarks, so that could theoretically work, although it feels a little
>> weird. After all, what I want is just to buffer events so that they are
>> only processed when the watermark has caught up to them.
>>
>> Thanks
>>
>> Timo Walther <tw...@apache.org> escreveu no dia sexta, 30/04/2021 à(s)
>> 12:05:
>>
>>> Hi Miguel,
>>>
>>> your initial idea sounds not too bad but why do you want to key by
>>> timestamp? Usually, you can simply key your stream by a custom key and
>>> store the events in a ListState until a watermark comes in.
>>>
>>> But if you really want to have some kind of global event-time order, you
>>> have two choices:
>>>
>>> - either a single operator with parallelism 1 that performs the ordering
>>> - or you send the every event to every operator using the broadcast
>>> state pattern [1]
>>>
>>> It is guaranteed that watermark will reach the downstream operator or
>>> sink after all events. Watermarks are synchronized across all parallel
>>> operator instances. You can store a Map uncheckpointed by this means
>>> that you have to ensure to initialize the map again during recovery.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>>
>>> On 30.04.21 11:37, Miguel Araújo wrote:
>>> > Hi everyone,
>>> >
>>> > I have a KeyedProcessFunction whose events I would like to process in
>>> > event-time order.
>>> > My initial idea was to use a Map keyed by timestamp and, when a new
>>> > event arrives, iterate over the Map to process events older than the
>>> > current watermark.
>>> >
>>> > The issue is that I obviously can't use a MapState, as my stream is
>>> > keyed, so the map would be scoped to the current key.
>>> > Is using a "regular" (i.e., not checkpointed) Map an option, given
>>> that
>>> > its content will be recreated by the replay of the events on a
>>> restart?
>>> > Is it guaranteed that the watermark that triggered the processing of
>>> > multiple events (and their subsequent push downstream) is not sent
>>> > downstream before these events themselves?
>>> >
>>> > Thanks,
>>> > Miguel
>>>
>>>

Re: Guaranteeing event order in a KeyedProcessFunction

Posted by Miguel Araújo <mi...@gmail.com>.
Thanks Dawid, having a look at CepOperator was useful. I implemented
something with one difference I feel might be important:

I noticed that in the CepOperator the timer is being registered for
currentWatermark+1, instead of using the event's timestamp. Is there a
reason for this? I think this implies a quadratic number of triggers, on
the number of keys with events that arrived after the current watermark.
For instance, if you have 1000 events per second on different keys (and
different timestamps), a watermark that is delayed 1 second will fire ~1
million times. Is this a requirement to the NFA implementation? Would this
not be a problem?

Thanks, once again.

Dawid Wysakowicz <dw...@apache.org> escreveu no dia segunda,
10/05/2021 à(s) 09:13:

> Hey Miguel,
>
> I think you could take a look at the CepOperator which does pretty much
> what you are describing.
>
> As for more direct answers for your questions. If you use
> KeyedProcessFunction it is always scoped to a single Key. There is no way
> to process events from other keys. If you want to have more control over
> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
> you could look into using Operator API. Bare in mind it is a semi-public
> API. It is very low level and subject to change rather frequently. Another
> thing to consider is that if you use PriorityQueue instead of e.g. MapState
> for buffering and ordering events you are constrained by the available
> memory. We used PriorityQueue in the past in the CepOperator but migrated
> it to MapState.
>
> It is possible that events in downstream operators can become late. It all
> depends on the timestamp of the events you emit from the "sorting"
> operator. If you emit records with timestamps larger than the Watermark
> that "triggered" its generation it can become late.
>
> Hope those tips could help you a bit.
>
> Best,
>
> Dawid
> On 04/05/2021 14:49, Miguel Araújo wrote:
>
> Hi Timo,
>
> Thanks for your answer. I think I wasn't clear enough in my initial
> message, so let me give more details.
>
> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
> user-id) and then fed into a KeyedProcessFunction. I want to process all
> events for a given user in order, before sending them downstream for
> further processing in other operators. I don't want to hold events longer
> than needed, hence using the watermark to signal which events can be
> processed.
> I don't think your suggestion of using a ListState would work, because we
> would effectively have one list per user. That would imply (among other
> things) that an event could only be processed when a new event for the same
> user arrives, which would not only imply a (potentially) huge latency, but
> also data leakage. Not to mention that the events being sent could easily
> be considered late-events to the downstream operators.
> The idea of keying by timestamp was an "evolution" of the ListState
> suggestion, where events-to-be-later-processed would be kept sorted in the
> map (which is what would be keyed by timestamp). We could iterate the map
> to process the events, instead of fetching the full list and sorting it to
> process the events in order. I don't think this solves any of the problems
> mentioned above, so I think that mentioning it only raised confusion.
>
> Regarding global event-time order, that's not really what I'm after. I
> only need event-time order per key, but I want to process the event as soon
> as possible, constrained by knowing that it is "safe" to do so because no
> event with a smaller timestamp for this key is yet to come.
>
> So, rephrasing my question as I'm not sure that part was clear in the
> initial message, here is the idea:
> - keeping one priority queue (ordered by timestamp) in each
> KeyedProcessFunction instance. Therefore, each priority queue would store
> events for multiple keys.
> - when an event arrives, we push it to the queue and then process events
> (updating state and sending them downstream) while their timestamp is lower
> than the current watermark.
>
> The question is:
> - is this fault tolerant? The priority queue is not state that is managed
> by flink, but it should be recoverable on replay.
> - is it possible that the events I'm sending downstream become late-events
> for a different operator, for some reason? Will they always be sent before
> the watermark of the event that originated the processElement() call?
> - I would effectively be processing multiple elements (from multiple keys)
> in the same call to processElement(). Is there a way to access the state of
> different keys?
>
> This doesn't feel like the right approach. Is there an operator more
> suitable than a KeyedProcessFunction which would allow me to handle the
> state for multiple keys in this task manager? Should I register a timer to
> trigger on the event timestamp instead? I believe timers trigger on
> watermarks, so that could theoretically work, although it feels a little
> weird. After all, what I want is just to buffer events so that they are
> only processed when the watermark has caught up to them.
>
> Thanks
>
> Timo Walther <tw...@apache.org> escreveu no dia sexta, 30/04/2021 à(s)
> 12:05:
>
>> Hi Miguel,
>>
>> your initial idea sounds not too bad but why do you want to key by
>> timestamp? Usually, you can simply key your stream by a custom key and
>> store the events in a ListState until a watermark comes in.
>>
>> But if you really want to have some kind of global event-time order, you
>> have two choices:
>>
>> - either a single operator with parallelism 1 that performs the ordering
>> - or you send the every event to every operator using the broadcast
>> state pattern [1]
>>
>> It is guaranteed that watermark will reach the downstream operator or
>> sink after all events. Watermarks are synchronized across all parallel
>> operator instances. You can store a Map uncheckpointed by this means
>> that you have to ensure to initialize the map again during recovery.
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>
>> On 30.04.21 11:37, Miguel Araújo wrote:
>> > Hi everyone,
>> >
>> > I have a KeyedProcessFunction whose events I would like to process in
>> > event-time order.
>> > My initial idea was to use a Map keyed by timestamp and, when a new
>> > event arrives, iterate over the Map to process events older than the
>> > current watermark.
>> >
>> > The issue is that I obviously can't use a MapState, as my stream is
>> > keyed, so the map would be scoped to the current key.
>> > Is using a "regular" (i.e., not checkpointed) Map an option, given that
>> > its content will be recreated by the replay of the events on a restart?
>> > Is it guaranteed that the watermark that triggered the processing of
>> > multiple events (and their subsequent push downstream) is not sent
>> > downstream before these events themselves?
>> >
>> > Thanks,
>> > Miguel
>>
>>

Re: Guaranteeing event order in a KeyedProcessFunction

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Miguel,

I think you could take a look at the CepOperator which does pretty much
what you are describing.

As for more direct answers for your questions. If you use
KeyedProcessFunction it is always scoped to a single Key. There is no
way to process events from other keys. If you want to have more control
over state and e.g. use PriorityQueue which would be snapshotted on
checkpoint you could look into using Operator API. Bare in mind it is a
semi-public API. It is very low level and subject to change rather
frequently. Another thing to consider is that if you use PriorityQueue
instead of e.g. MapState for buffering and ordering events you are
constrained by the available memory. We used PriorityQueue in the past
in the CepOperator but migrated it to MapState.

It is possible that events in downstream operators can become late. It
all depends on the timestamp of the events you emit from the "sorting"
operator. If you emit records with timestamps larger than the Watermark
that "triggered" its generation it can become late.

Hope those tips could help you a bit.

Best,

Dawid

On 04/05/2021 14:49, Miguel Araújo wrote:
> Hi Timo,
>
> Thanks for your answer. I think I wasn't clear enough in my initial
> message, so let me give more details.
>
> The stream is not keyed by timestamp, it's keyed by a custom field
> (e.g., user-id) and then fed into a KeyedProcessFunction. I want to
> process all events for a given user in order, before sending them
> downstream for further processing in other operators. I don't want to
> hold events longer than needed, hence using the watermark to signal
> which events can be processed.
> I don't think your suggestion of using a ListState would work, because
> we would effectively have one list per user. That would imply (among
> other things) that an event could only be processed when a new event
> for the same user arrives, which would not only imply a (potentially)
> huge latency, but also data leakage. Not to mention that the events
> being sent could easily be considered late-events to the downstream
> operators.
> The idea of keying by timestamp was an "evolution" of the ListState
> suggestion, where events-to-be-later-processed would be kept sorted in
> the map (which is what would be keyed by timestamp). We could iterate
> the map to process the events, instead of fetching the full list and
> sorting it to process the events in order. I don't think this solves
> any of the problems mentioned above, so I think that mentioning it
> only raised confusion.
>
> Regarding global event-time order, that's not really what I'm after. I
> only need event-time order per key, but I want to process the event as
> soon as possible, constrained by knowing that it is "safe" to do so
> because no event with a smaller timestamp for this key is yet to come.
>
> So, rephrasing my question as I'm not sure that part was clear in the
> initial message, here is the idea:
> - keeping one priority queue (ordered by timestamp) in each
> KeyedProcessFunction instance. Therefore, each priority queue would
> store events for multiple keys.
> - when an event arrives, we push it to the queue and then process
> events (updating state and sending them downstream) while their
> timestamp is lower than the current watermark.
>
> The question is:
> - is this fault tolerant? The priority queue is not state that is
> managed by flink, but it should be recoverable on replay.
> - is it possible that the events I'm sending downstream become
> late-events for a different operator, for some reason? Will they
> always be sent before the watermark of the event that originated the
> processElement() call?
> - I would effectively be processing multiple elements (from multiple
> keys) in the same call to processElement(). Is there a way to access
> the state of different keys?
>
> This doesn't feel like the right approach. Is there an operator more
> suitable than a KeyedProcessFunction which would allow me to handle
> the state for multiple keys in this task manager? Should I register a
> timer to trigger on the event timestamp instead? I believe timers
> trigger on watermarks, so that could theoretically work, although it
> feels a little weird. After all, what I want is just to buffer events
> so that they are only processed when the watermark has caught up to them.
>
> Thanks
>
> Timo Walther <twalthr@apache.org <ma...@apache.org>> escreveu
> no dia sexta, 30/04/2021 à(s) 12:05:
>
>     Hi Miguel,
>
>     your initial idea sounds not too bad but why do you want to key by
>     timestamp? Usually, you can simply key your stream by a custom key
>     and
>     store the events in a ListState until a watermark comes in.
>
>     But if you really want to have some kind of global event-time
>     order, you
>     have two choices:
>
>     - either a single operator with parallelism 1 that performs the
>     ordering
>     - or you send the every event to every operator using the broadcast
>     state pattern [1]
>
>     It is guaranteed that watermark will reach the downstream operator or
>     sink after all events. Watermarks are synchronized across all
>     parallel
>     operator instances. You can store a Map uncheckpointed by this means
>     that you have to ensure to initialize the map again during recovery.
>
>     Regards,
>     Timo
>
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html>
>
>     On 30.04.21 11:37, Miguel Araújo wrote:
>     > Hi everyone,
>     >
>     > I have a KeyedProcessFunction whose events I would like to
>     process in
>     > event-time order.
>     > My initial idea was to use a Map keyed by timestamp and, when a new
>     > event arrives, iterate over the Map to process events older than
>     the
>     > current watermark.
>     >
>     > The issue is that I obviously can't use a MapState, as my stream is
>     > keyed, so the map would be scoped to the current key.
>     > Is using a "regular" (i.e., not checkpointed) Map an option,
>     given that
>     > its content will be recreated by the replay of the events on a
>     restart?
>     > Is it guaranteed that the watermark that triggered the
>     processing of
>     > multiple events (and their subsequent push downstream) is not sent
>     > downstream before these events themselves?
>     >
>     > Thanks,
>     > Miguel
>

Re: Guaranteeing event order in a KeyedProcessFunction

Posted by Miguel Araújo <mi...@gmail.com>.
Hi Timo,

Thanks for your answer. I think I wasn't clear enough in my initial
message, so let me give more details.

The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
user-id) and then fed into a KeyedProcessFunction. I want to process all
events for a given user in order, before sending them downstream for
further processing in other operators. I don't want to hold events longer
than needed, hence using the watermark to signal which events can be
processed.
I don't think your suggestion of using a ListState would work, because we
would effectively have one list per user. That would imply (among other
things) that an event could only be processed when a new event for the same
user arrives, which would not only imply a (potentially) huge latency, but
also data leakage. Not to mention that the events being sent could easily
be considered late-events to the downstream operators.
The idea of keying by timestamp was an "evolution" of the ListState
suggestion, where events-to-be-later-processed would be kept sorted in the
map (which is what would be keyed by timestamp). We could iterate the map
to process the events, instead of fetching the full list and sorting it to
process the events in order. I don't think this solves any of the problems
mentioned above, so I think that mentioning it only raised confusion.

Regarding global event-time order, that's not really what I'm after. I only
need event-time order per key, but I want to process the event as soon as
possible, constrained by knowing that it is "safe" to do so because no
event with a smaller timestamp for this key is yet to come.

So, rephrasing my question as I'm not sure that part was clear in the
initial message, here is the idea:
- keeping one priority queue (ordered by timestamp) in each
KeyedProcessFunction instance. Therefore, each priority queue would store
events for multiple keys.
- when an event arrives, we push it to the queue and then process events
(updating state and sending them downstream) while their timestamp is lower
than the current watermark.

The question is:
- is this fault tolerant? The priority queue is not state that is managed
by flink, but it should be recoverable on replay.
- is it possible that the events I'm sending downstream become late-events
for a different operator, for some reason? Will they always be sent before
the watermark of the event that originated the processElement() call?
- I would effectively be processing multiple elements (from multiple keys)
in the same call to processElement(). Is there a way to access the state of
different keys?

This doesn't feel like the right approach. Is there an operator more
suitable than a KeyedProcessFunction which would allow me to handle the
state for multiple keys in this task manager? Should I register a timer to
trigger on the event timestamp instead? I believe timers trigger on
watermarks, so that could theoretically work, although it feels a little
weird. After all, what I want is just to buffer events so that they are
only processed when the watermark has caught up to them.

Thanks

Timo Walther <tw...@apache.org> escreveu no dia sexta, 30/04/2021 à(s)
12:05:

> Hi Miguel,
>
> your initial idea sounds not too bad but why do you want to key by
> timestamp? Usually, you can simply key your stream by a custom key and
> store the events in a ListState until a watermark comes in.
>
> But if you really want to have some kind of global event-time order, you
> have two choices:
>
> - either a single operator with parallelism 1 that performs the ordering
> - or you send the every event to every operator using the broadcast
> state pattern [1]
>
> It is guaranteed that watermark will reach the downstream operator or
> sink after all events. Watermarks are synchronized across all parallel
> operator instances. You can store a Map uncheckpointed by this means
> that you have to ensure to initialize the map again during recovery.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>
> On 30.04.21 11:37, Miguel Araújo wrote:
> > Hi everyone,
> >
> > I have a KeyedProcessFunction whose events I would like to process in
> > event-time order.
> > My initial idea was to use a Map keyed by timestamp and, when a new
> > event arrives, iterate over the Map to process events older than the
> > current watermark.
> >
> > The issue is that I obviously can't use a MapState, as my stream is
> > keyed, so the map would be scoped to the current key.
> > Is using a "regular" (i.e., not checkpointed) Map an option, given that
> > its content will be recreated by the replay of the events on a restart?
> > Is it guaranteed that the watermark that triggered the processing of
> > multiple events (and their subsequent push downstream) is not sent
> > downstream before these events themselves?
> >
> > Thanks,
> > Miguel
>
>

Re: Guaranteeing event order in a KeyedProcessFunction

Posted by Timo Walther <tw...@apache.org>.
Hi Miguel,

your initial idea sounds not too bad but why do you want to key by 
timestamp? Usually, you can simply key your stream by a custom key and 
store the events in a ListState until a watermark comes in.

But if you really want to have some kind of global event-time order, you 
have two choices:

- either a single operator with parallelism 1 that performs the ordering
- or you send the every event to every operator using the broadcast 
state pattern [1]

It is guaranteed that watermark will reach the downstream operator or 
sink after all events. Watermarks are synchronized across all parallel 
operator instances. You can store a Map uncheckpointed by this means 
that you have to ensure to initialize the map again during recovery.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html

On 30.04.21 11:37, Miguel Araújo wrote:
> Hi everyone,
> 
> I have a KeyedProcessFunction whose events I would like to process in 
> event-time order.
> My initial idea was to use a Map keyed by timestamp and, when a new 
> event arrives, iterate over the Map to process events older than the 
> current watermark.
> 
> The issue is that I obviously can't use a MapState, as my stream is 
> keyed, so the map would be scoped to the current key.
> Is using a "regular" (i.e., not checkpointed) Map an option, given that 
> its content will be recreated by the replay of the events on a restart? 
> Is it guaranteed that the watermark that triggered the processing of 
> multiple events (and their subsequent push downstream) is not sent 
> downstream before these events themselves?
> 
> Thanks,
> Miguel