You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by William Saar <wi...@saar.se> on 2017/05/29 16:33:55 UTC

Porting batch percentile computation to streaming window

I am porting a calculation from Spark batches that uses broadcast
variables to compute percentiles from metrics and curious for tips on
doing this with Flink streaming.

I have a windowed computation where I am compute metrics for
IP-addresses (a windowed stream of metrics objects grouped by
IP-addresses). Now I would like to compute percentiles for each IP
from the metrics.

My idea is to send all the metrics to a node that computes a global
TDigest and then rejoins the computed global TDigest with the
IP-grouped metrics stream to compute the percentiles for each IP. Is
there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our
TDigest, with every result of a grouped window stream.  Also how to
know when the TDigest is complete and has seen every element in the
window (say if I implement it in a stateful flatMap that emits the
value after seeing all stream values).

Thanks!
William


Re: Porting batch percentile computation to streaming window

Posted by Gyula Fóra <gy...@gmail.com>.
This is what you are looking for:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#incremental-window-aggregation-with-foldfunction

Cheers,
Gyula

William Saar <wi...@saar.se> ezt írta (időpont: 2017. máj. 31., Sze,
1:36):

> Nice! The solution is actually starting to look quite clean with this in
> place.
>
> Finally, does Flink offer functionality to retrieve information about the
> current window that a rich function is running on? I don't see anything in
> the RuntimeContext classes about the current window...
>
> As you pointed out earlier, I need to attach a window ID (for instance,
> the starting timestamp of the window) to each metric and propagate it to
> the TDigest objects to be able to associate the metrics with the right
> TDigest in the last stateful CoFlatMapFunction. You mentioned I should
> compute the window information in the initial fold function that computes
> the metrics,  and while I can compute a common window-start timestamp from
> the events in the metrics computation it would seem less ugly and
> error-prone if I could get information about the current window the fold
> function is running on from Flink.
>
>
> ----- Original Message -----
> From:
> "Gyula Fóra" <gy...@gmail.com>
>
> To:
> "William Saar" <wi...@saar.se>, <us...@flink.apache.org>
> Cc:
>
> Sent:
>
> Tue, 30 May 2017 13:56:08 +0000
>
>
> Subject:
> Re: Porting batch percentile computation to streaming window
>
>
> I think you could actually do a window operation to get the tDigestStream
> from windowMetricsByIp:
>
> windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)
>
> This way the watermark mechanism should ensure you get all partial results
> before flushing the global window.
>
> Gyula
>
> William Saar <wi...@saar.se> ezt írta (időpont: 2017. máj. 30., K,
> 15:03):
>
>> > This logic now assumes that you get the TDigest result before getting
>> any groupBy metric, which will probably not be the case so you could do
>> some custom buffering in state. Depending on the rate of the stream this
>> might or might not be feasible :)
>>
>> Unfortunately, I think this assumption is a deal-breaker. The value
>> stream is not grouped, but I need to distribute the values to compute the
>> metrics and I am populating the TDigest with the metrics
>>
>> Your suggestion gave me some ideas. Assume I have
>> windowMetricsByIp =
>> values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
>> tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How
>> do I know when the flat map has seen all values and should emit its result?
>> percentilesStream =
>> tDigestStream.broadcast().connect(windowMetricsByIp).flatMap
>>
>> If I attach information about the current window to the metrics events on
>> line 1, can I perhaps use that information to make flatMap on line 2 decide
>> when to emit its T-Digest? The crudest solution is to emit the T-Digest for
>> a window when the first event of the next window arrives (will this cause
>> problems with back-pressure?)
>> Less crude, maybe I can store watermark information or something on
>> metrics objects in line 1 and emit T digests more often in line 2?
>>
>> Finally, how do I access the watermark/window information in my fold
>> operation in line 1?
>>
>> Thanks!
>>
>>
>> ----- Original Message -----
>> From:
>> "Gyula Fóra" <gy...@gmail.com>
>>
>> To:
>> "William Saar" <wi...@saar.se>, <us...@flink.apache.org>
>> Cc:
>>
>> Sent:
>> Tue, 30 May 2017 08:56:28 +0000
>> Subject:
>> Re: Porting batch percentile computation to streaming window
>>
>>
>>
>>
>> Hi William,
>>
>> I think basically the feature you are looking for are side inputs which
>> is not implemented yet but let me try to give a workaround that might work.
>>
>> If I understand correctly you have two windowed computations:
>> TDigestStream = allMetrics.windowAll(...).reduce()
>> windowMetricsByIP = allMetrics.keyBy(ip).reduce()
>>
>> And now you want to join these two by window to compute the percentiles
>> Something like:
>>
>>
>> TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)
>>
>> In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest>
>> and every by ip metric aggregate could pick up the TDigest for the current
>> window. All this assumes that you attach the window information to the
>> aggregate metrics and the TDigest (you can do this in the window reduce
>> step).
>>
>> This logic now assumes that you get the TDigest result before getting any
>> groupBy metric, which will probably not be the case so you could do some
>> custom buffering in state. Depending on the rate of the stream this might
>> or might not be feasible :)
>>
>> Does this sound reasonable? I hope I have understood the use-case
>> correctly.
>> Gyula
>>
>>
>> William Saar <wi...@saar.se> ezt írta (időpont: 2017. máj. 29., H,
>> 18:34):
>>
>>> I am porting a calculation from Spark batches that uses broadcast
>>> variables to compute percentiles from metrics and curious for tips on doing
>>> this with Flink streaming.
>>>
>>> I have a windowed computation where I am compute metrics for
>>> IP-addresses (a windowed stream of metrics objects grouped by
>>> IP-addresses). Now I would like to compute percentiles for each IP from the
>>> metrics.
>>>
>>> My idea is to send all the metrics to a node that computes a global
>>> TDigest and then rejoins the computed global TDigest with the IP-grouped
>>> metrics stream to compute the percentiles for each IP. Is there a neat way
>>> to implement this in Flink?
>>>
>>> I am curious about the best way to join a global valuem like our
>>> TDigest, with every result of a grouped window stream.  Also how to know
>>> when the TDigest is complete and has seen every element in the window (say
>>> if I implement it in a stateful flatMap that emits the value after seeing
>>> all stream values).
>>>
>>> Thanks!
>>>
>>> William
>>>
>>

Re: Porting batch percentile computation to streaming window

Posted by William Saar <wi...@saar.se>.
Nice! The solution is actually starting to look quite clean with this
in place.

Finally, does Flink offer functionality to retrieve information about
the current window that a rich function is running on? I don't see
anything in the RuntimeContext classes about the current window... 

As you pointed out earlier, I need to attach a window ID (for
instance, the starting timestamp of the window) to each metric and
propagate it to the TDigest objects to be able to associate the
metrics with the right TDigest in the last stateful CoFlatMapFunction.
You mentioned I should compute the window information in the initial
fold function that computes the metrics,  and while I can compute a
common window-start timestamp from the events in the metrics
computation it would seem less ugly and error-prone if I could get
information about the current window the fold function is running on
from Flink.

----- Original Message -----
From:
 "Gyula Fóra" <gy...@gmail.com>

To:
"William Saar" <wi...@saar.se>, <us...@flink.apache.org>
Cc:

Sent:
Tue, 30 May 2017 13:56:08 +0000
Subject:
Re: Porting batch percentile computation to streaming window

I think you could actually do a window operation to get the
tDigestStream from windowMetricsByIp:

windowMetricsByIp.allWindow(SameWindowAs
TumblingTimeWindow
).fold(...)

This way the watermark mechanism should ensure you get all partial
results before flushing the global window.

Gyula

William Saar <william@saar.se [1]> ezt írta (időpont: 2017. máj.
30., K, 15:03):

> This logic now assumes that you get the TDigest result before
getting any groupBy metric, which will probably not be the case so you
could do some custom buffering in state. Depending on the rate of the
stream this might or might not be feasible :)

Unfortunately, I think this assumption is a deal-breaker. The value
stream is not grouped, but I need to distribute the values to compute
the metrics and I am populating the TDigest with the metrics

Your suggestion gave me some ideas. Assume I have
windowMetricsByIp =
values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
 tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) //
How do I know when the flat map has seen all values and should emit
its result?
percentilesStream =
tDigestStream.broadcast().connect(windowMetricsByIp).flatMap 

If I attach information about the current window to the metrics events
on line 1, can I perhaps use that information to make flatMap on line
2 decide when to emit its T-Digest? The crudest solution is to emit
the T-Digest for a window when the first event of the next window
arrives (will this cause problems with back-pressure?) 
Less crude, maybe I can store watermark information or something on
metrics objects in line 1 and emit T digests more often in line 2?

Finally, how do I access the watermark/window information in my fold
operation in line 1?

Thanks!

----- Original Message -----
From:
 "Gyula Fóra" <gyula.fora@gmail.com [2]>

To:
"William Saar" <william@saar.se [3]>, <user@flink.apache.org [4]>
Cc:

Sent:
Tue, 30 May 2017 08:56:28 +0000
Subject:
Re: Porting batch percentile computation to streaming window

Hi William,

I think basically the feature you are looking for are side inputs
which is not implemented yet but let me try to give a workaround that
might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the
percentiles

Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window,
TDigest> and every by ip metric aggregate could pick up the TDigest
for the current window. All this assumes that you attach the window
information to the aggregate metrics and the TDigest (you can do this
in the window reduce step). 

This logic now assumes that you get the TDigest result before getting
any groupBy metric, which will probably not be the case so you could
do some custom buffering in state. Depending on the rate of the stream
this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case
correctly.
Gyula

William Saar <william@saar.se [5]> ezt írta (időpont: 2017. máj.
29., H, 18:34):

I am porting a calculation from Spark batches that uses broadcast
variables to compute percentiles from metrics and curious for tips on
doing this with Flink streaming.

I have a windowed computation where I am compute metrics for
IP-addresses (a windowed stream of metrics objects grouped by
IP-addresses). Now I would like to compute percentiles for each IP
from the metrics.

My idea is to send all the metrics to a node that computes a global
TDigest and then rejoins the computed global TDigest with the
IP-grouped metrics stream to compute the percentiles for each IP. Is
there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our
TDigest, with every result of a grouped window stream.  Also how to
know when the TDigest is complete and has seen every element in the
window (say if I implement it in a stateful flatMap that emits the
value after seeing all stream values).

Thanks!

William

 

Links:
------
[1] mailto:william@saar.se
[2] mailto:gyula.fora@gmail.com
[3] mailto:william@saar.se
[4] mailto:user@flink.apache.org
[5] mailto:william@saar.se


Re: Porting batch percentile computation to streaming window

Posted by Gyula Fóra <gy...@gmail.com>.
I think you could actually do a window operation to get the tDigestStream
from windowMetricsByIp:

windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)

This way the watermark mechanism should ensure you get all partial results
before flushing the global window.

Gyula

William Saar <wi...@saar.se> ezt írta (időpont: 2017. máj. 30., K, 15:03):

> > This logic now assumes that you get the TDigest result before getting
> any groupBy metric, which will probably not be the case so you could do
> some custom buffering in state. Depending on the rate of the stream this
> might or might not be feasible :)
>
> Unfortunately, I think this assumption is a deal-breaker. The value stream
> is not grouped, but I need to distribute the values to compute the metrics
> and I am populating the TDigest with the metrics
>
> Your suggestion gave me some ideas. Assume I have
> windowMetricsByIp =
> values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
> tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How
> do I know when the flat map has seen all values and should emit its result?
> percentilesStream =
> tDigestStream.broadcast().connect(windowMetricsByIp).flatMap
>
> If I attach information about the current window to the metrics events on
> line 1, can I perhaps use that information to make flatMap on line 2 decide
> when to emit its T-Digest? The crudest solution is to emit the T-Digest for
> a window when the first event of the next window arrives (will this cause
> problems with back-pressure?)
> Less crude, maybe I can store watermark information or something on
> metrics objects in line 1 and emit T digests more often in line 2?
>
> Finally, how do I access the watermark/window information in my fold
> operation in line 1?
>
> Thanks!
>
>
> ----- Original Message -----
> From:
> "Gyula Fóra" <gy...@gmail.com>
>
> To:
> "William Saar" <wi...@saar.se>, <us...@flink.apache.org>
> Cc:
>
> Sent:
> Tue, 30 May 2017 08:56:28 +0000
> Subject:
> Re: Porting batch percentile computation to streaming window
>
>
>
>
> Hi William,
>
> I think basically the feature you are looking for are side inputs which is
> not implemented yet but let me try to give a workaround that might work.
>
> If I understand correctly you have two windowed computations:
> TDigestStream = allMetrics.windowAll(...).reduce()
> windowMetricsByIP = allMetrics.keyBy(ip).reduce()
>
> And now you want to join these two by window to compute the percentiles
> Something like:
>
>
> TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)
>
> In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest>
> and every by ip metric aggregate could pick up the TDigest for the current
> window. All this assumes that you attach the window information to the
> aggregate metrics and the TDigest (you can do this in the window reduce
> step).
>
> This logic now assumes that you get the TDigest result before getting any
> groupBy metric, which will probably not be the case so you could do some
> custom buffering in state. Depending on the rate of the stream this might
> or might not be feasible :)
>
> Does this sound reasonable? I hope I have understood the use-case
> correctly.
> Gyula
>
>
> William Saar <wi...@saar.se> ezt írta (időpont: 2017. máj. 29., H,
> 18:34):
>
>> I am porting a calculation from Spark batches that uses broadcast
>> variables to compute percentiles from metrics and curious for tips on doing
>> this with Flink streaming.
>>
>> I have a windowed computation where I am compute metrics for IP-addresses
>> (a windowed stream of metrics objects grouped by IP-addresses). Now I would
>> like to compute percentiles for each IP from the metrics.
>>
>> My idea is to send all the metrics to a node that computes a global
>> TDigest and then rejoins the computed global TDigest with the IP-grouped
>> metrics stream to compute the percentiles for each IP. Is there a neat way
>> to implement this in Flink?
>>
>> I am curious about the best way to join a global valuem like our TDigest,
>> with every result of a grouped window stream.  Also how to know when the
>> TDigest is complete and has seen every element in the window (say if I
>> implement it in a stateful flatMap that emits the value after seeing all
>> stream values).
>>
>> Thanks!
>>
>> William
>>
>

Re: Porting batch percentile computation to streaming window

Posted by William Saar <wi...@saar.se>.
> This logic now assumes that you get the TDigest result before
getting any groupBy metric, which will probably not be the case so you
could do some custom buffering in state. Depending on the rate of the
stream this might or might not be feasible :)

Unfortunately, I think this assumption is a deal-breaker. The value
stream is not grouped, but I need to distribute the values to compute
the metrics and I am populating the TDigest with the metrics

Your suggestion gave me some ideas. Assume I have
windowMetricsByIp =
values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
 tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) //
How do I know when the flat map has seen all values and should emit
its result?
percentilesStream =
tDigestStream.broadcast().connect(windowMetricsByIp).flatMap 

If I attach information about the current window to the metrics events
on line 1, can I perhaps use that information to make flatMap on line
2 decide when to emit its T-Digest? The crudest solution is to emit
the T-Digest for a window when the first event of the next window
arrives (will this cause problems with back-pressure?) 
Less crude, maybe I can store watermark information or something on
metrics objects in line 1 and emit T digests more often in line 2?

Finally, how do I access the watermark/window information in my fold
operation in line 1?

Thanks!

----- Original Message -----
From: "Gyula Fóra" <gy...@gmail.com>
To:"William Saar" <wi...@saar.se>, <us...@flink.apache.org>
Cc:
Sent:Tue, 30 May 2017 08:56:28 +0000
Subject:Re: Porting batch percentile computation to streaming window

Hi William,

I think basically the feature you are looking for are side inputs
which is not implemented yet but let me try to give a workaround that
might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the
percentiles

Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window,
TDigest> and every by ip metric aggregate could pick up the TDigest
for the current window. All this assumes that you attach the window
information to the aggregate metrics and the TDigest (you can do this
in the window reduce step). 

This logic now assumes that you get the TDigest result before getting
any groupBy metric, which will probably not be the case so you could
do some custom buffering in state. Depending on the rate of the stream
this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case
correctly.
Gyula

William Saar <william@saar.se [1]> ezt írta (időpont: 2017. máj.
29., H, 18:34):
I am porting a calculation from Spark batches that uses broadcast
variables to compute percentiles from metrics and curious for tips on
doing this with Flink streaming.

I have a windowed computation where I am compute metrics for
IP-addresses (a windowed stream of metrics objects grouped by
IP-addresses). Now I would like to compute percentiles for each IP
from the metrics.

My idea is to send all the metrics to a node that computes a global
TDigest and then rejoins the computed global TDigest with the
IP-grouped metrics stream to compute the percentiles for each IP. Is
there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our
TDigest, with every result of a grouped window stream.  Also how to
know when the TDigest is complete and has seen every element in the
window (say if I implement it in a stateful flatMap that emits the
value after seeing all stream values).

Thanks!
William
 

Links:
------
[1] mailto:william@saar.se


Re: Porting batch percentile computation to streaming window

Posted by Gyula Fóra <gy...@gmail.com>.
Hi William,

I think basically the feature you are looking for are side inputs which is
not implemented yet but let me try to give a workaround that might work.

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the percentiles
Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> and
every by ip metric aggregate could pick up the TDigest for the current
window. All this assumes that you attach the window information to the
aggregate metrics and the TDigest (you can do this in the window reduce
step).

This logic now assumes that you get the TDigest result before getting any
groupBy metric, which will probably not be the case so you could do some
custom buffering in state. Depending on the rate of the stream this might
or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case correctly.
Gyula


William Saar <wi...@saar.se> ezt írta (időpont: 2017. máj. 29., H, 18:34):

> I am porting a calculation from Spark batches that uses broadcast
> variables to compute percentiles from metrics and curious for tips on doing
> this with Flink streaming.
>
> I have a windowed computation where I am compute metrics for IP-addresses
> (a windowed stream of metrics objects grouped by IP-addresses). Now I would
> like to compute percentiles for each IP from the metrics.
>
> My idea is to send all the metrics to a node that computes a global
> TDigest and then rejoins the computed global TDigest with the IP-grouped
> metrics stream to compute the percentiles for each IP. Is there a neat way
> to implement this in Flink?
>
> I am curious about the best way to join a global valuem like our TDigest,
> with every result of a grouped window stream.  Also how to know when the
> TDigest is complete and has seen every element in the window (say if I
> implement it in a stateful flatMap that emits the value after seeing all
> stream values).
>
> Thanks!
>
> William
>