You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kristinn Danielsson via user <us...@flink.apache.org> on 2022/09/06 07:44:10 UTC
Mixed up session aggregations for same key
Hi,
I'm trying to migrate a KafkaStreams application to Flink (using DataStream API).
The application consumes a high traffic (millions of events per second) Kafka
topic and collects events into sessions keyed by id. To reduce the load on
subsequent processing steps I want to output one event on session start and one
event on session end. So, I set up a pipeline which keys the stream by id,
aggregates the events over a event time session window with a gap of 4 seconds.
I also implemented a custom trigger to trigger when the first event
arrives in a window.
When I run this pipeline I somtimes observe that I get multiple calls to the
aggregator's "createAccumulator" method for a given session id, and therefore I
also get duplicate session start and session end events for the session id.
So it looks to me that the Flink is collecting the events into multiple sessions
even if they have the same session id.
Examples:
Input events:
Event timestamp Id
2022-09-06 08:00:00 ABC
2022-09-06 08:00:01 ABC
2022-09-06 08:00:02 ABC
2022-09-06 08:00:03 ABC
2022-09-06 08:00:04 ABC
2022-09-06 08:00:05 ABC
Problem 1:
Output events:
Event time Id Type
2022-09-06 08:00:00 ABC Start
2022-09-06 08:00:03 ABC End
2022-09-06 08:00:04 ABC Start
2022-09-06 08:00:05 ABC End
Problem 2:
Output events:
Event time Id Type
2022-09-06 08:00:00 ABC Start
2022-09-06 08:00:03 ABC Start
2022-09-06 08:00:04 ABC End
2022-09-06 08:00:05 ABC End
Expected output:
Event time Id Type
2022-09-06 08:00:00 ABC Start
2022-09-06 08:00:05 ABC End
Is this expected behaviour? How can I avoid getting duplicate session windows?
Thanks for your help
Kristinn
Re: Mixed up session aggregations for same key
Posted by David Anderson <da...@apache.org>.
The way that Flink handles session windows is that every new event is
initially assigned to its own session window, and then overlapping sessions
are merged. I imagine this is why you are seeing so many calls
to createAccumulator.
This implementation choice is deeply embedded in the code; I don't think it
can be avoided.
If you can afford to wait until a session ends to emit the session start
event, then you will only be reporting once for each session. Another
solution might be to implement your own windowing using a process function
-- but if you are using event time logic, and if the events can be
processed out of order, I suspect it would be difficult to do much better.
David
On Mon, Sep 5, 2022 at 9:45 PM Kristinn Danielsson via user <
user@flink.apache.org> wrote:
> Hi,
>
>
>
> I'm trying to migrate a KafkaStreams application to Flink (using
> DataStream API).
>
> The application consumes a high traffic (millions of events per second)
> Kafka
>
> topic and collects events into sessions keyed by id. To reduce the load on
>
> subsequent processing steps I want to output one event on session start
> and one
>
> event on session end. So, I set up a pipeline which keys the stream by id,
>
> aggregates the events over a event time session window with a gap of 4
> seconds.
>
> I also implemented a custom trigger to trigger when the first event
>
> arrives in a window.
>
>
>
> When I run this pipeline I somtimes observe that I get multiple calls to
> the
>
> aggregator's "createAccumulator" method for a given session id, and
> therefore I
>
> also get duplicate session start and session end events for the session id.
>
> So it looks to me that the Flink is collecting the events into multiple
> sessions
>
> even if they have the same session id.
>
>
>
> Examples:
>
>
>
> Input events:
>
> Event timestamp Id
>
> 2022-09-06 08:00:00 ABC
>
> 2022-09-06 08:00:01 ABC
>
> 2022-09-06 08:00:02 ABC
>
> 2022-09-06 08:00:03 ABC
>
> 2022-09-06 08:00:04 ABC
>
> 2022-09-06 08:00:05 ABC
>
>
>
> Problem 1:
>
> Output events:
>
> Event time Id Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:03 ABC End
>
> 2022-09-06 08:00:04 ABC Start
>
> 2022-09-06 08:00:05 ABC End
>
> Problem 2:
>
> Output events:
>
> Event time Id Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:03 ABC Start
>
> 2022-09-06 08:00:04 ABC End
>
> 2022-09-06 08:00:05 ABC End
>
>
>
> Expected output:
>
> Event time Id Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:05 ABC End
>
>
>
>
>
> Is this expected behaviour? How can I avoid getting duplicate session
> windows?
>
>
>
> Thanks for your help
>
> Kristinn
>