You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2020/08/07 14:38:00 UTC

[jira] [Comment Edited] (FLINK-18808) Task-level numRecordsOut metric may be underestimated

    [ https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173177#comment-17173177 ] 

Piotr Nowojski edited comment on FLINK-18808 at 8/7/20, 2:37 PM:
-----------------------------------------------------------------

+1 to what was written above. It looks like this change should work (a unit test should confirm that) and it makes sense.

I would also lean towards keeping the records sent for broadcast exchanges as it is... if not for the fact that UI is naming the metric differently. {{Records Sent}} I would expect to be network level metric, so taking into account records multiplication by broadcast. As the task is sending all of those broadcasted records. In the code and on the metric level ({{numRecordsOut}}), the current semantic makes more sense, as that's the number of records that was actually produced by the task.

So I would suggest to rename the metric in the UI to {{"Produced Records"}} or something like that.


was (Author: pnowojski):
+1 to what was written above. It looks like this change should work (a unit test should confirm that) and it makes sense.

I would also lean towards keeping the records sent for broadcast exchanges as it is... if not for the fact that UI is naming the metric differently. {{Records Sent}} I would expect to be network level metric, so taking into account records multiplication by broadcast. As the task is sending all of those broadcasted records. In the code and on the metric level ({{numRecordsOut}}), the current semantic makes more sense, as that's the number of records that was actually produced by the task.

So I would suggest to rename the metric in the UI to {{"Produced Records"}} or something like that.

> Task-level numRecordsOut metric may be underestimated
> -----------------------------------------------------
>
>                 Key: FLINK-18808
>                 URL: https://issues.apache.org/jira/browse/FLINK-18808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics, Runtime / Task
>    Affects Versions: 1.11.1
>            Reporter: ming li
>            Priority: Major
>              Labels: usability
>         Attachments: image-2020-08-04-11-28-13-800.png, image-2020-08-04-11-32-20-678.png
>
>
> At present, we only register task-level numRecordsOut metric by reusing operator output record counter at the end of OperatorChain.
> {code:java}
> if (config.isChainEnd()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }
> {code}
> If we only send data out through the last operator of OperatorChain, there is no problem with this statistics. But consider the following scenario:
> !image-2020-08-04-11-28-13-800.png|width=507,height=174!
> In this JobGraph, we not only send data in the last operator, but also send data in the middle operator of OperatorChain (the map operator just returns the original value directly). Below is one of our test topology, we can see that the statistics actually only have half of the total data received by the downstream.
> !image-2020-08-04-11-32-20-678.png|width=648,height=251!
> I think the data sent out by the intermediate operator should also be counted into the numRecordsOut of the Task. But currently we are not reusing operators output record counters in the intermediate operators, which leads to our task-level numRecordsOut metric is underestimated (although this has no effect on the actual operation of the job, it may affect our monitoring).
> A simple idea of ​​mine is to modify the condition of reusing operators output record counter:
> {code:java}
> if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }{code}
> In addition, I have another question: If a record is broadcast to all downstream, should the numRecordsOut counter increase by one or the downstream number? It seems that currently we are adding one to calculate the numRecordsOut metric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)