You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dheeraj Taneja <dh...@gmail.com> on 2022/06/13 17:55:14 UTC

Flink - aggregated output with status progression

Hello,

I have a stream of events that are coming over Kafka. Each event progresses
through a series of statuses. I want to display aggregated output of how
many events are in a particular status. If suppose an event has progressed
from status A to Status C then that event needs to be only counted for the
last status that it was in. Below is an example data and expected output.
What is the most effective way of doing this in Flink?

Sample Data

Event1: Event(Id1, statusA, 2022-06-09T16:15:08Z)
Event2: Event(Id2, statusA, 2022-06-09T16:20:08Z)
Event3: Event(Id1, statusB, 2022-06-09T16:25:08Z)
Event4: Event(id1, statusC, 2022-06-09T16:26:08Z)
Event4: Event(id3, statusC, 2022-06-09T16:30:08Z)

Outcome

Status A - 1 (Id2)
Status B - 0 (none)
Status C - 2 (Id1 & Id3)

Re:Flink - aggregated output with status progression

Posted by Xuyang <xy...@163.com>.
Hi, what about use "Top1 + Agg" or "UDAF" for your scene.


The main idea I think is that when the event changed from status A to C, Flink needs to send a `DELETE` data to downstream to delete the old data and send a new one to downstream again. And `TOP1` will keep the newest one with same `Id`.




Please tell me if this plan works.







--

    Best!
    Xuyang




At 2022-06-14 01:55:14, "Dheeraj Taneja" <dh...@gmail.com> wrote:

Hello, 



I have a stream of events that are coming over Kafka. Each event progresses through a series of statuses. I want to display aggregated output of how many events are in a particular status. If suppose an event has progressed from status A to Status C then that event needs to be only counted for the last status that it was in. Below is an example data and expected output. What is the most effective way of doing this in Flink?

Sample Data

Event1: Event(Id1, statusA, 2022-06-09T16:15:08Z)
Event2: Event(Id2, statusA, 2022-06-09T16:20:08Z)
Event3: Event(Id1, statusB, 2022-06-09T16:25:08Z)
Event4: Event(id1, statusC, 2022-06-09T16:26:08Z)
Event4: Event(id3, statusC, 2022-06-09T16:30:08Z)


Outcome

Status A - 1 (Id2)
Status B - 0 (none)
Status C - 2 (Id1 & Id3)