You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/12 11:44:57 UTC

[GitHub] [flink] zentol commented on a change in pull request #14618: [FLINK-20852][metrics][webui] Provide more detailed per subtask backpressure stats

zentol commented on a change in pull request #14618:
URL: https://github.com/apache/flink/pull/14618#discussion_r555708125



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
##########
@@ -49,50 +54,78 @@
                 JobVertexBackPressureInfo,
                 JobVertexMessageParameters> {
 
+    private final MetricFetcher metricFetcher;
+
     public JobVertexBackPressureHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Time timeout,
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters>
-                    messageHeaders) {
+                    messageHeaders,
+            MetricFetcher metricFetcher) {
         super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+        this.metricFetcher = metricFetcher;
     }
 
     @Override
     protected CompletableFuture<JobVertexBackPressureInfo> handleRequest(
             @Nonnull HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request,
             @Nonnull RestfulGateway gateway)
             throws RestHandlerException {
+        metricFetcher.update();
+
         final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
         final JobVertexID jobVertexId = request.getPathParameter(JobVertexIdPathParameter.class);
-        return gateway.requestOperatorBackPressureStats(jobId, jobVertexId)
-                .thenApply(
-                        operatorBackPressureStats ->
-                                operatorBackPressureStats
-                                        .getOperatorBackPressureStats()
-                                        .map(
-                                                JobVertexBackPressureHandler
-                                                        ::createJobVertexBackPressureInfo)
-                                        .orElse(JobVertexBackPressureInfo.deprecated()));
+
+        TaskMetricStore taskMetricStore =
+                metricFetcher
+                        .getMetricStore()
+                        .getTaskMetricStore(jobId.toString(), jobVertexId.toString());
+
+        return CompletableFuture.completedFuture(
+                taskMetricStore != null
+                        ? createJobVertexBackPressureInfo(
+                                taskMetricStore.getAllSubtaskMetricStores())
+                        : JobVertexBackPressureInfo.deprecated());
     }
 
-    private static JobVertexBackPressureInfo createJobVertexBackPressureInfo(
-            final OperatorBackPressureStats operatorBackPressureStats) {
+    private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
+            Collection<ComponentMetricStore> allSubtaskMetricStores) {
+
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
-                getBackPressureLevel(operatorBackPressureStats.getMaxBackPressureRatio()),
-                operatorBackPressureStats.getEndTimestamp(),
-                IntStream.range(0, operatorBackPressureStats.getNumberOfSubTasks())
-                        .mapToObj(
-                                subtask -> {
-                                    final double backPressureRatio =
-                                            operatorBackPressureStats.getBackPressureRatio(subtask);
-                                    return new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                                            subtask,
-                                            getBackPressureLevel(backPressureRatio),
-                                            backPressureRatio);
-                                })
-                        .collect(Collectors.toList()));
+                getBackPressureLevel(getMaxBackPressureRatio(allSubtaskMetricStores)),
+                metricFetcher.getLastUpdateTime(),
+                createSubtaskBackPressureInfo(allSubtaskMetricStores));
+    }
+
+    private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
+            Collection<ComponentMetricStore> subtaskMetricStores) {
+        List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
+        int subTaskIndex = 0;
+        for (ComponentMetricStore subtaskMetricStore : subtaskMetricStores) {
+            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
+            result.add(
+                    new SubtaskBackPressureInfo(
+                            subTaskIndex,
+                            getBackPressureLevel(backPressureRatio),
+                            backPressureRatio));
+            subTaskIndex++;

Review comment:
       this could be incorrect if at this point in time only some of the subtasks have reported back. We could change taskMetricStore.getAllSubtaskMetricStores() to also return the subtask index, or add a dedicated SubtaskMetricStore with a getter for the index.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
##########
@@ -124,8 +128,19 @@ private double getMaxBackPressureRatio(
     }
 
     private double getBackPressureRatio(ComponentMetricStore metricStore) {
-        return Double.valueOf(metricStore.getMetric(MetricNames.TASK_BACK_PRESSURED_TIME, "0"))
-                / 1_000;
+        return getDoubleMetric(metricStore, MetricNames.TASK_BACK_PRESSURED_TIME);
+    }
+
+    private double getIdleRatio(ComponentMetricStore metricStore) {
+        return getDoubleMetric(metricStore, MetricNames.TASK_IDLE_TIME);
+    }
+
+    private double getBusyRatio(ComponentMetricStore metricStore) {
+        return getDoubleMetric(metricStore, MetricNames.TASK_BUSY_TIME);
+    }
+
+    private double getDoubleMetric(ComponentMetricStore metricStore, String metricName) {

Review comment:
       This does a bit more than loading a double metric.

##########
File path: flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.ts
##########
@@ -41,6 +41,15 @@ export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy
     return node.subtask;
   }
 
+  prettyPrint(value: number): string {
+    if (isNaN(value)) {
+      return "N/A"
+    }
+    else {
+      return Math.round(value * 100) + "%";

Review comment:
       does this round up or down? (I suppose we ideally should round down so we never accidentally show 100%?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org