You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by natasky <na...@gmail.com> on 2019/11/24 13:20:23 UTC

Window-join DataStream (or KeyedStream) with a broadcast stream

Hi all,

I use window aggregation to create a stream of aggregated data per user, per
some interval.

Additionally, I use same windows to aggregate system-wide data per the same
interval.

I.e.:

Per user stream: events keyed by user ID -> tumbling window -> aggregation

System wide stream: events -> tumbling window (windowAll) -> aggregation

I need to produce a value per user, per interval, that depends on the
aggregated
data from that user and the system wide data aggregated for the
corresponding
interval.

I couldn't find a way to acheive this with Flink's windows. I think can I
get
it to work with broadcast, connect and CoProcessFunction - is that the way
to
go? How would I handle late events that way?

Thanks!
- Nathan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Window-join DataStream (or KeyedStream) with a broadcast stream

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

So you are trying to use the same window definition, but you want to aggregate the data in two different ways:

1. keyBy(userId)
2. Global aggregation

Do you want to use exactly the same aggregation functions? If not, you can just process the events twice:


DataStream<…> events = …;

DataStream<….> keyedEvents = events
	.keyBy(…)
	.window(…)
	.process(f) // instead of process this can be whatever you want, aggregate/apply/reduce/...
DataStream<….> nonKeyedEvents = events
	.windowAll(…)
	.process(g)


From here you can process keyedEvents and nonKeyedEvents as you prefer.

If yes, if both global and non global aggregation are using similar/the same aggregation function, you could try to use `keyedEvents` as pre-aggregated input for `.windowAll(…)`. 


DataStream<….> keyedEvents = events.keyBy(…).window(…).process(f) 

keyedEvents.print() // or further process keyedEvents
DataStream<….> nonKeyedEvents = keyedEvents.windowAll(…).process(f')


But this assumes that the output of your `process(f)` can be re-processed. This second approach can minimise amount of work to be done by the global aggregation. In the first approach, all of the records will have to be processed by a single operator (global aggregation), which can be a performance bottleneck.

Piotrek

> On 24 Nov 2019, at 14:20, natasky <na...@gmail.com> wrote:
> 
> Hi all,
> 
> I use window aggregation to create a stream of aggregated data per user, per
> some interval.
> 
> Additionally, I use same windows to aggregate system-wide data per the same
> interval.
> 
> I.e.:
> 
> Per user stream: events keyed by user ID -> tumbling window -> aggregation
> 
> System wide stream: events -> tumbling window (windowAll) -> aggregation
> 
> I need to produce a value per user, per interval, that depends on the
> aggregated
> data from that user and the system wide data aggregated for the
> corresponding
> interval.
> 
> I couldn't find a way to acheive this with Flink's windows. I think can I
> get
> it to work with broadcast, connect and CoProcessFunction - is that the way
> to
> go? How would I handle late events that way?
> 
> Thanks!
> - Nathan
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/