You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/01/05 11:24:38 UTC
[flink] 03/03: [FLINK-30185][ui] Allow choose the subtask index in the flame graph page
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 898fe6ba6c882b82a266fb5a415d68e9c9113cb8
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sat Dec 17 22:41:00 2022 +0800
[FLINK-30185][ui] Allow choose the subtask index in the flame graph page
This closes #21447
---
.../job-overview-drawer-flamegraph.component.html | 18 +++++++
.../job-overview-drawer-flamegraph.component.less | 2 +-
.../job-overview-drawer-flamegraph.component.ts | 59 ++++++++++++++++++++--
.../web-dashboard/src/app/services/job.service.ts | 11 ++++
4 files changed, 86 insertions(+), 4 deletions(-)
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html
index 92cd970e53b..3c34fbc40ba 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html
@@ -48,6 +48,24 @@ Type:
</label>
</nz-radio-group>
+Subtask:
+<nz-select
+ nzSize="small"
+ [ngModel]="subtaskIndex"
+ nzShowSearch
+ (ngModelChange)="selectSubtask($event)"
+>
+ <nz-option
+ *ngFor="let name of listOfRunningSubtasks"
+ [nzLabel]="name"
+ [nzValue]="name"
+ nzCustomContent
+ >
+ <span [title]="name">{{ name }}</span>
+ </nz-option>
+</nz-select>
+
+
<ng-container [ngSwitch]="flameGraph.endTimestamp">
<span *ngSwitchCase="-1">The task has already been terminated</span>
<span *ngSwitchCase="-2">
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less
index 58c534074c4..73716a18ca1 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less
@@ -19,7 +19,7 @@
@import "theme";
nz-select {
- width: 300px;
+ width: 100px;
}
.metric-selector {
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts
index acc182e6a6a..b4a7e92b46a 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-import { NgIf, NgSwitch, NgSwitchCase, NgSwitchDefault } from '@angular/common';
+import { NgForOf, NgIf, NgSwitch, NgSwitchCase, NgSwitchDefault } from '@angular/common';
import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core';
import { FormsModule } from '@angular/forms';
import { Subject } from 'rxjs';
@@ -26,7 +26,9 @@ import { FlameGraphComponent } from '@flink-runtime-web/components/flame-graph/f
import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe';
import { FlameGraphType, JobFlameGraph, NodesItemCorrect } from '@flink-runtime-web/interfaces';
import { JobService } from '@flink-runtime-web/services';
+import { isNil } from '@flink-runtime-web/utils';
import { NzRadioModule } from 'ng-zorro-antd/radio';
+import { NzSelectModule } from 'ng-zorro-antd/select';
import { NzSpinModule } from 'ng-zorro-antd/spin';
import { JobLocalService } from '../../job-local.service';
@@ -38,6 +40,8 @@ import { JobLocalService } from '../../job-local.service';
styleUrls: ['./job-overview-drawer-flamegraph.component.less'],
imports: [
NgIf,
+ NgForOf,
+ NzSelectModule,
NzRadioModule,
FormsModule,
NgSwitch,
@@ -55,8 +59,11 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
public now = Date.now();
public selectedVertex: NodesItemCorrect | null;
public flameGraph = {} as JobFlameGraph;
+ public allSubtasks = 'all';
+ public listOfRunningSubtasks: string[] = [this.allSubtasks];
public graphType = FlameGraphType.ON_CPU;
+ public subtaskIndex = this.allSubtasks;
private readonly destroy$ = new Subject<void>();
@@ -67,6 +74,7 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
) {}
public ngOnInit(): void {
+ this.requestRunningSubtasks();
this.requestFlameGraph(this.graphType);
}
@@ -76,11 +84,22 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
}
private requestFlameGraph(graphType: FlameGraphType): void {
+ this.flameGraph = {} as JobFlameGraph;
this.jobLocalService
.jobWithVertexChanges()
.pipe(
tap(data => (this.selectedVertex = data.vertex)),
- mergeMap(data => this.jobService.loadOperatorFlameGraph(data.job.jid, data.vertex!.id, graphType)),
+ mergeMap(data => {
+ if (this.subtaskIndex === this.allSubtasks) {
+ return this.jobService.loadOperatorFlameGraph(data.job.jid, data.vertex!.id, graphType);
+ }
+ return this.jobService.loadOperatorFlameGraphForSingleSubtask(
+ data.job.jid,
+ data.vertex!.id,
+ graphType,
+ this.subtaskIndex
+ );
+ }),
takeUntil(this.destroy$)
)
.subscribe(
@@ -100,9 +119,43 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
);
}
+ private requestRunningSubtasks(): void {
+ this.jobLocalService
+ .jobWithVertexChanges()
+ .pipe(
+ tap(data => (this.selectedVertex = data.vertex)),
+ mergeMap(data => {
+ return this.jobService.loadSubTasks(data.job.jid, data.vertex!.id);
+ }),
+ takeUntil(this.destroy$)
+ )
+ .subscribe(
+ data => {
+ const runningSubtasks = data?.subtasks
+ .filter(subtaskInfo => subtaskInfo.status === 'RUNNING')
+ .map(subtaskInfo => subtaskInfo.subtask.toString());
+ if (isNil(runningSubtasks)) {
+ return;
+ }
+ this.listOfRunningSubtasks = [this.allSubtasks, ...runningSubtasks];
+ this.cdr.markForCheck();
+ },
+ () => {
+ this.listOfRunningSubtasks = [this.allSubtasks];
+ this.cdr.markForCheck();
+ }
+ );
+ }
+
+ public selectSubtask(subtaskIndex: string): void {
+ this.destroy$.next();
+ this.subtaskIndex = subtaskIndex;
+ this.cdr.markForCheck();
+ this.requestFlameGraph(this.graphType);
+ }
+
public selectFrameGraphType(graphType: FlameGraphType): void {
this.destroy$.next();
- this.flameGraph = {} as JobFlameGraph;
this.requestFlameGraph(graphType);
}
}
diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
index c3c92a25e76..21eb73bc3f4 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
@@ -122,6 +122,17 @@ export class JobService {
);
}
+ public loadOperatorFlameGraphForSingleSubtask(
+ jobId: string,
+ vertexId: string,
+ type: string,
+ subtaskIndex: string
+ ): Observable<JobFlameGraph> {
+ return this.httpClient.get<JobFlameGraph>(
+ `${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}/flamegraph?type=${type}&subtaskindex=${subtaskIndex}`
+ );
+ }
+
public loadSubTasks(jobId: string, vertexId: string): Observable<JobVertexSubTaskDetail> {
return this.httpClient.get<JobVertexSubTaskDetail>(
`${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}`