You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joris Geer <jo...@oracle.com> on 2020/07/19 23:26:54 UTC

Custom metrics output

Hi,

We want to collect metrics for stream processing, typically counts aggregated over 1-minute buckets. However, we want these 1-minute boundaries determined by timestamps within the data records. Flink metrics do not handle this so we want to roll our own. How to proceed ? Some of our team members believe we can add methods in operator class code that can be called from the main Flink program, whist I am not sure this is supposed to be possible. Others consider using a side output stream with a record per input record and use Flink operators to do the aggregation. That may double the amount of records processed.

Can we extend the Flink metrics to provide such aggregation ?

Regards,

Joris


Re: Custom metrics output

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Joris,

I don't think that the approach of "add methods in operator class code that
can be called from the main Flink program" will work.

The most efficient approach would be implementing a ProcessFunction that
counts in 1-min time buckets (using event-time semantics) and updates the
metrics.
If you need the metric values to be exact, you can keep the intermediate
counts as operator state.
I would not use a KeyedProcessFunction because you didn't mention a key and
to save the overhead of the shuffle.

You can integrate the ProcessFunctions in different ways in your job.

1) just embed it into the regular flow. The ProcessFunction would just
count and forward every record it receives.
2) fork off a stream of records that just just hold the timestamp to a side
output and apply the ProcessFunction on the forked-off stream.

I think the first approach is simpler and more efficient. The
ProcessFunction would be an identity function to your actual data, just
counting and reporting metrics.

Best, Fabian

Am Mo., 20. Juli 2020 um 01:30 Uhr schrieb Joris Geer <
joris.van.der.geer@oracle.com>:

> Hi,
>
> We want to collect metrics for stream processing, typically counts
> aggregated over 1-minute buckets. However, we want these 1-minute
> boundaries determined by timestamps within the data records. Flink metrics
> do not handle this so we want to roll our own. How to proceed ? Some of our
> team members believe we can add methods in operator class code that can be
> called from the main Flink program, whist I am not sure this is supposed to
> be possible. Others consider using a side output stream with a record per
> input record and use Flink operators to do the aggregation. That may double
> the amount of records processed.
>
> Can we extend the Flink metrics to provide such aggregation ?
>
> Regards,
>
> Joris
>
>