You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/09/27 15:37:10 UTC

[flink] 05/06: [FLINK-13386][web]: Add all operators watermark

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b35c7ed9cbd412435ba70521270976218ccfa60
Author: vthinkxie <ya...@alibaba-inc.com>
AuthorDate: Sat Jul 27 17:56:26 2019 +0800

    [FLINK-13386][web]: Add all operators watermark
---
 .../web-dashboard/src/app/interfaces/job-detail.ts |  1 +
 .../pages/job/overview/job-overview.component.ts   | 53 +++++++++++++++++++---
 .../src/app/share/common/dagre/node.component.html |  1 +
 .../src/app/share/common/dagre/node.component.less |  7 ++-
 .../src/app/share/common/dagre/node.component.ts   |  2 +
 5 files changed, 57 insertions(+), 7 deletions(-)

diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
index 3bf9c13..1429104 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
@@ -129,6 +129,7 @@ export interface NodesItemInterface {
 
 export interface NodesItemCorrectInterface extends NodesItemInterface {
   detail: VerticesItemInterface | undefined;
+  lowWatermark?: number;
 }
 
 export interface NodesItemLinkInterface {
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 5174653..a0765c4 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -26,10 +26,11 @@ import {
   ViewChild
 } from '@angular/core';
 import { ActivatedRoute, Router } from '@angular/router';
-import { Subject } from 'rxjs';
-import { filter, takeUntil } from 'rxjs/operators';
+import { LONG_MIN_VALUE } from 'config';
+import { forkJoin, Observable, of, Subject } from 'rxjs';
+import { catchError, filter, map, takeUntil } from 'rxjs/operators';
 import { NodesItemCorrectInterface, NodesItemLinkInterface } from 'interfaces';
-import { JobService } from 'services';
+import { JobService, MetricsService } from 'services';
 import { DagreComponent } from 'share/common/dagre/dagre.component';
 
 @Component({
@@ -62,11 +63,52 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
     }
   }
 
+  mergeWithWatermarks(nodes: NodesItemCorrectInterface[]): Observable<NodesItemCorrectInterface[]> {
+    return forkJoin(
+      nodes.map(node => {
+        const listOfMetricId = [];
+        let lowWatermark = NaN;
+        for (let i = 0; i < node.parallelism; i++) {
+          listOfMetricId.push(`${i}.currentInputWatermark`);
+        }
+        return this.metricService.getMetrics(this.jobId, node.id, listOfMetricId).pipe(
+          map(metrics => {
+            let minValue = NaN;
+            const watermarks: { [index: string]: number } = {};
+            for (const key in metrics.values) {
+              const value = metrics.values[key];
+              const subtaskIndex = key.replace('.currentInputWatermark', '');
+              watermarks[subtaskIndex] = value;
+              if (isNaN(minValue) || value < minValue) {
+                minValue = value;
+              }
+            }
+            if (!isNaN(minValue) && minValue > LONG_MIN_VALUE) {
+              lowWatermark = minValue;
+            } else {
+              lowWatermark = NaN;
+            }
+            return { ...node, lowWatermark };
+          })
+        );
+      })
+    ).pipe(catchError(() => of(nodes)));
+  }
+
+  refreshNodesWithWatermarks() {
+    this.mergeWithWatermarks(this.nodes).subscribe(nodes => {
+      nodes.forEach(node => {
+        this.dagreComponent.updateNode(node.id, node);
+      });
+    });
+  }
+
   constructor(
     private jobService: JobService,
     private router: Router,
     private activatedRoute: ActivatedRoute,
     public elementRef: ElementRef,
+    private metricService: MetricsService,
     private cdr: ChangeDetectorRef
   ) {}
 
@@ -82,11 +124,10 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
           this.links = data.plan.links;
           this.jobId = data.plan.jid;
           this.dagreComponent.flush(this.nodes, this.links, true).then();
+          this.refreshNodesWithWatermarks();
         } else {
           this.nodes = data.plan.nodes;
-          this.nodes.forEach(node => {
-            this.dagreComponent.updateNode(node.id, node);
-          });
+          this.refreshNodesWithWatermarks();
         }
         this.cdr.markForCheck();
       });
diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
index 799ad2b..3b57b46 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
@@ -23,6 +23,7 @@
         <xhtml:div class="detail">{{operator}}</xhtml:div>
         <xhtml:div class="detail description">{{description}}</xhtml:div>
         <xhtml:div class="node-label">Parallelism: {{parallelism}}</xhtml:div>
+        <xhtml:div class="node-label watermark" *ngIf="lowWatermark">Low Watermark <xhtml:br/> {{lowWatermark}}</xhtml:div>
         <xhtml:div class="detail last" *ngIf="operatorStrategy">Operation: {{operatorStrategy}}</xhtml:div>
       </h4>
     </xhtml:div>
diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
index 0f10e2d..0533e05 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
+++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
@@ -50,12 +50,17 @@
       &:first-child {
         margin-top: 24px;
       }
+
+      &.watermark {
+        font-weight: normal;;
+        font-weight: 12px;
+        color: @text-color-secondary;
+      }
     }
 
     .detail {
       margin-bottom: 12px;
       color: @text-color;
-
       &.description {
         color: @heading-color;
       }
diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
index 2f96b12..f24ef24 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
@@ -30,6 +30,7 @@ export class NodeComponent {
   operator: string | null;
   operatorStrategy: string | null;
   parallelism: number | null;
+  lowWatermark: number | null | undefined;
   height = 0;
   id: string;
 
@@ -45,6 +46,7 @@ export class NodeComponent {
     this.operator = this.decodeHTML(value.operator);
     this.operatorStrategy = this.decodeHTML(value.operator_strategy);
     this.parallelism = value.parallelism;
+    this.lowWatermark = value.lowWatermark;
     this.height = value.height || 0;
     this.id = value.id;
     if (description && description.length > 300) {