You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kristinn Danielsson via user <us...@flink.apache.org> on 2022/09/06 07:44:10 UTC

Mixed up session aggregations for same key

Hi,

I'm trying to migrate a KafkaStreams application to Flink (using DataStream API).
The application consumes a high traffic (millions of events per second) Kafka
topic and collects events into sessions keyed by id. To reduce the load on
subsequent processing steps I want to output one event on session start and one
event on session end. So, I set up a pipeline which keys the stream by id,
aggregates the events over a event time session window with a gap of 4 seconds.
I also implemented a custom trigger to trigger when the first event
arrives in a window.

When I run this pipeline I somtimes observe that I get multiple calls to the
aggregator's "createAccumulator" method for a given session id, and therefore I
also get duplicate session start and session end events for the session id.
So it looks to me that the Flink is collecting the events into multiple sessions
even if they have the same session id.

Examples:

    Input events:
        Event timestamp         Id
        2022-09-06 08:00:00     ABC
        2022-09-06 08:00:01     ABC
        2022-09-06 08:00:02     ABC
        2022-09-06 08:00:03     ABC
        2022-09-06 08:00:04     ABC
        2022-09-06 08:00:05     ABC

    Problem 1:
        Output events:
            Event time              Id      Type
            2022-09-06 08:00:00     ABC     Start
            2022-09-06 08:00:03     ABC     End
            2022-09-06 08:00:04     ABC     Start
            2022-09-06 08:00:05     ABC     End
    Problem 2:
        Output events:
            Event time              Id      Type
            2022-09-06 08:00:00     ABC     Start
            2022-09-06 08:00:03     ABC     Start
            2022-09-06 08:00:04     ABC     End
            2022-09-06 08:00:05     ABC     End

    Expected output:
        Event time              Id      Type
        2022-09-06 08:00:00     ABC     Start
        2022-09-06 08:00:05     ABC     End


Is this expected behaviour? How can I avoid getting duplicate session windows?

Thanks for your help
Kristinn

Re: Mixed up session aggregations for same key

Posted by David Anderson <da...@apache.org>.
The way that Flink handles session windows is that every new event is
initially assigned to its own session window, and then overlapping sessions
are merged. I imagine this is why you are seeing so many calls
to createAccumulator.

This implementation choice is deeply embedded in the code; I don't think it
can be avoided.

If you can afford to wait until a session ends to emit the session start
event, then you will only be reporting once for each session. Another
solution might be to implement your own windowing using a process function
-- but if you are using event time logic, and if the events can be
processed out of order, I suspect it would be difficult to do much better.

David

On Mon, Sep 5, 2022 at 9:45 PM Kristinn Danielsson via user <
user@flink.apache.org> wrote:

> Hi,
>
>
>
> I'm trying to migrate a KafkaStreams application to Flink (using
> DataStream API).
>
> The application consumes a high traffic (millions of events per second)
> Kafka
>
> topic and collects events into sessions keyed by id. To reduce the load on
>
> subsequent processing steps I want to output one event on session start
> and one
>
> event on session end. So, I set up a pipeline which keys the stream by id,
>
> aggregates the events over a event time session window with a gap of 4
> seconds.
>
> I also implemented a custom trigger to trigger when the first event
>
> arrives in a window.
>
>
>
> When I run this pipeline I somtimes observe that I get multiple calls to
> the
>
> aggregator's "createAccumulator" method for a given session id, and
> therefore I
>
> also get duplicate session start and session end events for the session id.
>
> So it looks to me that the Flink is collecting the events into multiple
> sessions
>
> even if they have the same session id.
>
>
>
> Examples:
>
>
>
>     Input events:
>
>         Event timestamp         Id
>
>         2022-09-06 08:00:00     ABC
>
>         2022-09-06 08:00:01     ABC
>
>         2022-09-06 08:00:02     ABC
>
>         2022-09-06 08:00:03     ABC
>
>         2022-09-06 08:00:04     ABC
>
>         2022-09-06 08:00:05     ABC
>
>
>
>     Problem 1:
>
>         Output events:
>
>             Event time              Id      Type
>
>             2022-09-06 08:00:00     ABC     Start
>
>             2022-09-06 08:00:03     ABC     End
>
>             2022-09-06 08:00:04     ABC     Start
>
>             2022-09-06 08:00:05     ABC     End
>
>     Problem 2:
>
>         Output events:
>
>             Event time              Id      Type
>
>             2022-09-06 08:00:00     ABC     Start
>
>             2022-09-06 08:00:03     ABC     Start
>
>             2022-09-06 08:00:04     ABC     End
>
>             2022-09-06 08:00:05     ABC     End
>
>
>
>     Expected output:
>
>         Event time              Id      Type
>
>         2022-09-06 08:00:00     ABC     Start
>
>         2022-09-06 08:00:05     ABC     End
>
>
>
>
>
> Is this expected behaviour? How can I avoid getting duplicate session
> windows?
>
>
>
> Thanks for your help
>
> Kristinn
>