You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chen Bekor <ch...@gmail.com> on 2016/03/27 10:50:34 UTC

flink streaming - window chaining example

hi all!

I'm just starting my way with flink and I have a design question.

I'm trying to aggregate incoming events (source: kafka topic) on a 10min
tumbling window in order to calculate the incoming events rate (total per
minute).

I would like to take this window and perform an additional window (60 min)
in order to calculate percentiles, std deviation and some other statistics
on that time window. finally I would like to trigger some business logic in
case the calculation hits a certain threshold.

my main challenge is - how to chain the two windows together.

any help is appreciated (please send scala example code - I'm not using
java :) for this project)

Re: flink streaming - window chaining example

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
val stream:DataStream[String] = env
  .addSource(new FlinkKafkaConsumer08[String]("topic_name", new
SimpleStringSchema, prop))

val event:DataStream[SomeEventObj] = stream.map(MyMapFunction)

val tenMinute:DataStream[AggEvents] =
ridesByDeviceIdStream.timeWindowAll(Time.of(10,
TimeUnit.MINUTES).trigger

            (ContinuousProcessingTimeTrigger.of(Time.minutes(1))).map(MyMapFunction1)

val oneHour = tenMinute.keyBy(_.mykey).TumblingEventTimeWindows.of(Time.minutes(60))).trigger
(MyTriggerFunction)


Above is pseduo code, may have some syntax errors but is should do
what you are looking for. There is dependency on the

tenminute window and one hour window function, so one will execute
after the other.


On Sun, Mar 27, 2016 at 2:20 PM, Chen Bekor <ch...@gmail.com> wrote:

> hi all!
>
> I'm just starting my way with flink and I have a design question.
>
> I'm trying to aggregate incoming events (source: kafka topic) on a 10min
> tumbling window in order to calculate the incoming events rate (total per
> minute).
>
> I would like to take this window and perform an additional window (60 min)
> in order to calculate percentiles, std deviation and some other statistics
> on that time window. finally I would like to trigger some business logic in
> case the calculation hits a certain threshold.
>
> my main challenge is - how to chain the two windows together.
>
> any help is appreciated (please send scala example code - I'm not using
> java :) for this project)
>