You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Salva Alcántara <sa...@gmail.com> on 2021/02/17 05:09:32 UTC

How to report metric based on keyed state piece

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

Posted by Salva Alcántara <sa...@gmail.com>.
Many thanks Kezhu for pointing me on that direction!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

Posted by Salva Alcántara <sa...@gmail.com>.
Awesome Piotr!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Salva,

I'm not sure, but I think you can not access the state (especially the
keyed state) from within the metric, as metrics are being evaluated outside
of the keyed context, and also from another thread. Also things like
`ValueState`/`MapState` are not exposing any size.

So probably you would have to follow Kezhu's suggestion. Whenever you are
updating your state value, you can also update a shared variable to track
the combined size (`AtomicLong`?). Upon recovery you would need to
reinitialize it (maybe indeed `KeyedStateBackend.applyToAllKeys`).

Piotrek



śr., 17 lut 2021 o 14:13 Kezhu Wang <ke...@gmail.com> napisał(a):

> With an initial `y`, I think you could compute new `y` on new stream
> value. Upon recovering from checkpoint, may be
> `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:09:39, Salva Alcántara (salcantaraphd@gmail.com)
> wrote:
>
> I wonder what is the canonical way to accomplish the following:
>
> Given a Flink UDF, how to report a metric `y` which is a function of some
> (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
> the size of the state X.
>
> For instance, consider a `CoFlatMap` function with:
>
> - `X` being a `MapState`
> - `y` (the metric) consisting of the aggregated size (i.e., the total size
> of the `MapState`, for all keys)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Re: How to report metric based on keyed state piece

Posted by Kezhu Wang <ke...@gmail.com>.
With an initial `y`, I think you could compute new `y` on new stream value.
Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys`
could help you to rebuild an initial `y`.

Best,
Kezhu Wang

On February 17, 2021 at 13:09:39, Salva Alcántara (salcantaraphd@gmail.com)
wrote:

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



-- 
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/