You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/09/27 15:37:10 UTC
[flink] 05/06: [FLINK-13386][web]: Add all operators watermark
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8b35c7ed9cbd412435ba70521270976218ccfa60
Author: vthinkxie <ya...@alibaba-inc.com>
AuthorDate: Sat Jul 27 17:56:26 2019 +0800
[FLINK-13386][web]: Add all operators watermark
---
.../web-dashboard/src/app/interfaces/job-detail.ts | 1 +
.../pages/job/overview/job-overview.component.ts | 53 +++++++++++++++++++---
.../src/app/share/common/dagre/node.component.html | 1 +
.../src/app/share/common/dagre/node.component.less | 7 ++-
.../src/app/share/common/dagre/node.component.ts | 2 +
5 files changed, 57 insertions(+), 7 deletions(-)
diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
index 3bf9c13..1429104 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
@@ -129,6 +129,7 @@ export interface NodesItemInterface {
export interface NodesItemCorrectInterface extends NodesItemInterface {
detail: VerticesItemInterface | undefined;
+ lowWatermark?: number;
}
export interface NodesItemLinkInterface {
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 5174653..a0765c4 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -26,10 +26,11 @@ import {
ViewChild
} from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
-import { Subject } from 'rxjs';
-import { filter, takeUntil } from 'rxjs/operators';
+import { LONG_MIN_VALUE } from 'config';
+import { forkJoin, Observable, of, Subject } from 'rxjs';
+import { catchError, filter, map, takeUntil } from 'rxjs/operators';
import { NodesItemCorrectInterface, NodesItemLinkInterface } from 'interfaces';
-import { JobService } from 'services';
+import { JobService, MetricsService } from 'services';
import { DagreComponent } from 'share/common/dagre/dagre.component';
@Component({
@@ -62,11 +63,52 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
}
}
+ mergeWithWatermarks(nodes: NodesItemCorrectInterface[]): Observable<NodesItemCorrectInterface[]> {
+ return forkJoin(
+ nodes.map(node => {
+ const listOfMetricId = [];
+ let lowWatermark = NaN;
+ for (let i = 0; i < node.parallelism; i++) {
+ listOfMetricId.push(`${i}.currentInputWatermark`);
+ }
+ return this.metricService.getMetrics(this.jobId, node.id, listOfMetricId).pipe(
+ map(metrics => {
+ let minValue = NaN;
+ const watermarks: { [index: string]: number } = {};
+ for (const key in metrics.values) {
+ const value = metrics.values[key];
+ const subtaskIndex = key.replace('.currentInputWatermark', '');
+ watermarks[subtaskIndex] = value;
+ if (isNaN(minValue) || value < minValue) {
+ minValue = value;
+ }
+ }
+ if (!isNaN(minValue) && minValue > LONG_MIN_VALUE) {
+ lowWatermark = minValue;
+ } else {
+ lowWatermark = NaN;
+ }
+ return { ...node, lowWatermark };
+ })
+ );
+ })
+ ).pipe(catchError(() => of(nodes)));
+ }
+
+ refreshNodesWithWatermarks() {
+ this.mergeWithWatermarks(this.nodes).subscribe(nodes => {
+ nodes.forEach(node => {
+ this.dagreComponent.updateNode(node.id, node);
+ });
+ });
+ }
+
constructor(
private jobService: JobService,
private router: Router,
private activatedRoute: ActivatedRoute,
public elementRef: ElementRef,
+ private metricService: MetricsService,
private cdr: ChangeDetectorRef
) {}
@@ -82,11 +124,10 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
this.links = data.plan.links;
this.jobId = data.plan.jid;
this.dagreComponent.flush(this.nodes, this.links, true).then();
+ this.refreshNodesWithWatermarks();
} else {
this.nodes = data.plan.nodes;
- this.nodes.forEach(node => {
- this.dagreComponent.updateNode(node.id, node);
- });
+ this.refreshNodesWithWatermarks();
}
this.cdr.markForCheck();
});
diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
index 799ad2b..3b57b46 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
@@ -23,6 +23,7 @@
<xhtml:div class="detail">{{operator}}</xhtml:div>
<xhtml:div class="detail description">{{description}}</xhtml:div>
<xhtml:div class="node-label">Parallelism: {{parallelism}}</xhtml:div>
+ <xhtml:div class="node-label watermark" *ngIf="lowWatermark">Low Watermark <xhtml:br/> {{lowWatermark}}</xhtml:div>
<xhtml:div class="detail last" *ngIf="operatorStrategy">Operation: {{operatorStrategy}}</xhtml:div>
</h4>
</xhtml:div>
diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
index 0f10e2d..0533e05 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
+++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
@@ -50,12 +50,17 @@
&:first-child {
margin-top: 24px;
}
+
+ &.watermark {
+ font-weight: normal;;
+ font-weight: 12px;
+ color: @text-color-secondary;
+ }
}
.detail {
margin-bottom: 12px;
color: @text-color;
-
&.description {
color: @heading-color;
}
diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
index 2f96b12..f24ef24 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
@@ -30,6 +30,7 @@ export class NodeComponent {
operator: string | null;
operatorStrategy: string | null;
parallelism: number | null;
+ lowWatermark: number | null | undefined;
height = 0;
id: string;
@@ -45,6 +46,7 @@ export class NodeComponent {
this.operator = this.decodeHTML(value.operator);
this.operatorStrategy = this.decodeHTML(value.operator_strategy);
this.parallelism = value.parallelism;
+ this.lowWatermark = value.lowWatermark;
this.height = value.height || 0;
this.id = value.id;
if (description && description.length > 300) {