You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (Jira)" <ji...@apache.org> on 2020/05/18 08:12:00 UTC

[jira] [Closed] (FLINK-17348) Expose metric group to ascendingTimestampExtractor

     [ https://issues.apache.org/jira/browse/FLINK-17348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek closed FLINK-17348.
------------------------------------
    Fix Version/s: 1.11.0
       Resolution: Fixed

This was added in the new interfaces of FLIP-126: https://issues.apache.org/jira/browse/FLINK-17653. You can now specify suppliers for both {{TimestampAssigner}} and {{WatermarkGenerator}}, and these get access to the metrics group.

Could you check if this works for you? If not, please re-open this issue.

> Expose metric group to ascendingTimestampExtractor
> --------------------------------------------------
>
>                 Key: FLINK-17348
>                 URL: https://issues.apache.org/jira/browse/FLINK-17348
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Theo Diefenthal
>            Priority: Major
>             Fix For: 1.11.0
>
>
> A common use case in Flink + kafka is that one has lots of kafka Partitions with each having ascending timestamps.
> In my scenario, due to various operational reasons, we put log files from Filesystem to kafka, one server per partition, and then consume those in Flink.
> Sometimes, it can happen that we collect the files in wrong order into kafka which leads to ascending timestamp problems. If that happens and we have the default logging violation handler enabled, we produce several gb of logs in a very short amount of time, which we would like to circumvent. 
> What we really want : track the number of violations in a metric and define an alarm on that in our monitoring dashboard.
> Currently, there is sadly no way to reference the metric group from the ascending timestamp extractor. I wish, there could be something similar like the open method on other rich functions. 
> My current workaround is to add a custom map task post to the source. For that task I need to pass on the kafka partition from the source, which I usually don't care about and I need to keep track of each partitions current timestamp manually, exactly the same way as the extractor does. - > workaround with "polluting" my pipeline quite a bit just for a single metric. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)