You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/01/05 11:24:38 UTC

[flink] 03/03: [FLINK-30185][ui] Allow choose the subtask index in the flame graph page

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 898fe6ba6c882b82a266fb5a415d68e9c9113cb8
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sat Dec 17 22:41:00 2022 +0800

    [FLINK-30185][ui] Allow choose the subtask index in the flame graph page
    
    This closes #21447
---
 .../job-overview-drawer-flamegraph.component.html  | 18 +++++++
 .../job-overview-drawer-flamegraph.component.less  |  2 +-
 .../job-overview-drawer-flamegraph.component.ts    | 59 ++++++++++++++++++++--
 .../web-dashboard/src/app/services/job.service.ts  | 11 ++++
 4 files changed, 86 insertions(+), 4 deletions(-)

diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html
index 92cd970e53b..3c34fbc40ba 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html
@@ -48,6 +48,24 @@ Type:
   </label>
 </nz-radio-group>
 
+Subtask:
+<nz-select
+  nzSize="small"
+  [ngModel]="subtaskIndex"
+  nzShowSearch
+  (ngModelChange)="selectSubtask($event)"
+>
+  <nz-option
+    *ngFor="let name of listOfRunningSubtasks"
+    [nzLabel]="name"
+    [nzValue]="name"
+    nzCustomContent
+  >
+    <span [title]="name">{{ name }}</span>
+  </nz-option>
+</nz-select>
+
+&nbsp; &nbsp;
 <ng-container [ngSwitch]="flameGraph.endTimestamp">
   <span *ngSwitchCase="-1">The task has already been terminated</span>
   <span *ngSwitchCase="-2">
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less
index 58c534074c4..73716a18ca1 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.less
@@ -19,7 +19,7 @@
 @import "theme";
 
 nz-select {
-  width: 300px;
+  width: 100px;
 }
 
 .metric-selector {
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts
index acc182e6a6a..b4a7e92b46a 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.ts
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-import { NgIf, NgSwitch, NgSwitchCase, NgSwitchDefault } from '@angular/common';
+import { NgForOf, NgIf, NgSwitch, NgSwitchCase, NgSwitchDefault } from '@angular/common';
 import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core';
 import { FormsModule } from '@angular/forms';
 import { Subject } from 'rxjs';
@@ -26,7 +26,9 @@ import { FlameGraphComponent } from '@flink-runtime-web/components/flame-graph/f
 import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe';
 import { FlameGraphType, JobFlameGraph, NodesItemCorrect } from '@flink-runtime-web/interfaces';
 import { JobService } from '@flink-runtime-web/services';
+import { isNil } from '@flink-runtime-web/utils';
 import { NzRadioModule } from 'ng-zorro-antd/radio';
+import { NzSelectModule } from 'ng-zorro-antd/select';
 import { NzSpinModule } from 'ng-zorro-antd/spin';
 
 import { JobLocalService } from '../../job-local.service';
@@ -38,6 +40,8 @@ import { JobLocalService } from '../../job-local.service';
   styleUrls: ['./job-overview-drawer-flamegraph.component.less'],
   imports: [
     NgIf,
+    NgForOf,
+    NzSelectModule,
     NzRadioModule,
     FormsModule,
     NgSwitch,
@@ -55,8 +59,11 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
   public now = Date.now();
   public selectedVertex: NodesItemCorrect | null;
   public flameGraph = {} as JobFlameGraph;
+  public allSubtasks = 'all';
+  public listOfRunningSubtasks: string[] = [this.allSubtasks];
 
   public graphType = FlameGraphType.ON_CPU;
+  public subtaskIndex = this.allSubtasks;
 
   private readonly destroy$ = new Subject<void>();
 
@@ -67,6 +74,7 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
   ) {}
 
   public ngOnInit(): void {
+    this.requestRunningSubtasks();
     this.requestFlameGraph(this.graphType);
   }
 
@@ -76,11 +84,22 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
   }
 
   private requestFlameGraph(graphType: FlameGraphType): void {
+    this.flameGraph = {} as JobFlameGraph;
     this.jobLocalService
       .jobWithVertexChanges()
       .pipe(
         tap(data => (this.selectedVertex = data.vertex)),
-        mergeMap(data => this.jobService.loadOperatorFlameGraph(data.job.jid, data.vertex!.id, graphType)),
+        mergeMap(data => {
+          if (this.subtaskIndex === this.allSubtasks) {
+            return this.jobService.loadOperatorFlameGraph(data.job.jid, data.vertex!.id, graphType);
+          }
+          return this.jobService.loadOperatorFlameGraphForSingleSubtask(
+            data.job.jid,
+            data.vertex!.id,
+            graphType,
+            this.subtaskIndex
+          );
+        }),
         takeUntil(this.destroy$)
       )
       .subscribe(
@@ -100,9 +119,43 @@ export class JobOverviewDrawerFlameGraphComponent implements OnInit, OnDestroy {
       );
   }
 
+  private requestRunningSubtasks(): void {
+    this.jobLocalService
+      .jobWithVertexChanges()
+      .pipe(
+        tap(data => (this.selectedVertex = data.vertex)),
+        mergeMap(data => {
+          return this.jobService.loadSubTasks(data.job.jid, data.vertex!.id);
+        }),
+        takeUntil(this.destroy$)
+      )
+      .subscribe(
+        data => {
+          const runningSubtasks = data?.subtasks
+            .filter(subtaskInfo => subtaskInfo.status === 'RUNNING')
+            .map(subtaskInfo => subtaskInfo.subtask.toString());
+          if (isNil(runningSubtasks)) {
+            return;
+          }
+          this.listOfRunningSubtasks = [this.allSubtasks, ...runningSubtasks];
+          this.cdr.markForCheck();
+        },
+        () => {
+          this.listOfRunningSubtasks = [this.allSubtasks];
+          this.cdr.markForCheck();
+        }
+      );
+  }
+
+  public selectSubtask(subtaskIndex: string): void {
+    this.destroy$.next();
+    this.subtaskIndex = subtaskIndex;
+    this.cdr.markForCheck();
+    this.requestFlameGraph(this.graphType);
+  }
+
   public selectFrameGraphType(graphType: FlameGraphType): void {
     this.destroy$.next();
-    this.flameGraph = {} as JobFlameGraph;
     this.requestFlameGraph(graphType);
   }
 }
diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
index c3c92a25e76..21eb73bc3f4 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
@@ -122,6 +122,17 @@ export class JobService {
     );
   }
 
+  public loadOperatorFlameGraphForSingleSubtask(
+    jobId: string,
+    vertexId: string,
+    type: string,
+    subtaskIndex: string
+  ): Observable<JobFlameGraph> {
+    return this.httpClient.get<JobFlameGraph>(
+      `${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}/flamegraph?type=${type}&subtaskindex=${subtaskIndex}`
+    );
+  }
+
   public loadSubTasks(jobId: string, vertexId: string): Observable<JobVertexSubTaskDetail> {
     return this.httpClient.get<JobVertexSubTaskDetail>(
       `${this.configService.BASE_URL}/jobs/${jobId}/vertices/${vertexId}`