You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/30 07:52:33 UTC

[seatunnel] branch dev updated: [Improve][Zeta] Speed up listAllJob function (#4852)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 136431498 [Improve][Zeta] Speed up listAllJob function (#4852)
136431498 is described below

commit 13643149885db942a28999316136614400bd492a
Author: Jia Fan <fa...@qq.com>
AuthorDate: Tue May 30 15:52:26 2023 +0800

    [Improve][Zeta] Speed up listAllJob function (#4852)
---
 .../engine/server/master/JobHistoryService.java    | 91 ++++++++++++----------
 1 file changed, 50 insertions(+), 41 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 0e7bbd5bc..12dcae40c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -102,7 +102,8 @@ public class JobHistoryService {
     public String listAllJob() {
         List<JobStatusData> status = new ArrayList<>();
         Stream.concat(
-                        runningJobMasterMap.values().stream().map(this::toJobStateMapper),
+                        runningJobMasterMap.values().stream()
+                                .map(master -> toJobStateMapper(master, true)),
                         finishedJobStateImap.values().stream())
                 .forEach(
                         jobState -> {
@@ -126,7 +127,7 @@ public class JobHistoryService {
     // Get detailed status of a single job
     public JobState getJobDetailState(Long jobId) {
         return runningJobMasterMap.containsKey(jobId)
-                ? toJobStateMapper(runningJobMasterMap.get(jobId))
+                ? toJobStateMapper(runningJobMasterMap.get(jobId), false)
                 : finishedJobStateImap.getOrDefault(jobId, null);
     }
 
@@ -158,7 +159,7 @@ public class JobHistoryService {
 
     @SuppressWarnings("checkstyle:MagicNumber")
     public void storeFinishedJobState(JobMaster jobMaster) {
-        JobState jobState = toJobStateMapper(jobMaster);
+        JobState jobState = toJobStateMapper(jobMaster, false);
         jobState.setFinishTime(System.currentTimeMillis());
         finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
     }
@@ -170,47 +171,55 @@ public class JobHistoryService {
         finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
     }
 
-    private JobState toJobStateMapper(JobMaster jobMaster) {
+    private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) {
 
         Long jobId = jobMaster.getJobImmutableInformation().getJobId();
         Map<PipelineLocation, PipelineStateData> pipelineStateMapperMap = new HashMap<>();
-
-        jobMaster
-                .getPhysicalPlan()
-                .getPipelineList()
-                .forEach(
-                        pipeline -> {
-                            PipelineLocation pipelineLocation = pipeline.getPipelineLocation();
-                            PipelineStatus pipelineState =
-                                    (PipelineStatus) runningJobStateIMap.get(pipelineLocation);
-                            Map<TaskGroupLocation, ExecutionState> taskStateMap = new HashMap<>();
-                            pipeline.getCoordinatorVertexList()
-                                    .forEach(
-                                            coordinator -> {
-                                                TaskGroupLocation taskGroupLocation =
-                                                        coordinator.getTaskGroupLocation();
-                                                taskStateMap.put(
-                                                        taskGroupLocation,
-                                                        (ExecutionState)
-                                                                runningJobStateIMap.get(
-                                                                        taskGroupLocation));
-                                            });
-                            pipeline.getPhysicalVertexList()
-                                    .forEach(
-                                            task -> {
-                                                TaskGroupLocation taskGroupLocation =
-                                                        task.getTaskGroupLocation();
-                                                taskStateMap.put(
-                                                        taskGroupLocation,
-                                                        (ExecutionState)
-                                                                runningJobStateIMap.get(
-                                                                        taskGroupLocation));
-                                            });
-
-                            PipelineStateData pipelineStateData =
-                                    new PipelineStateData(pipelineState, taskStateMap);
-                            pipelineStateMapperMap.put(pipelineLocation, pipelineStateData);
-                        });
+        if (!simple) {
+            try {
+                jobMaster
+                        .getPhysicalPlan()
+                        .getPipelineList()
+                        .forEach(
+                                pipeline -> {
+                                    PipelineLocation pipelineLocation =
+                                            pipeline.getPipelineLocation();
+                                    PipelineStatus pipelineState =
+                                            (PipelineStatus)
+                                                    runningJobStateIMap.get(pipelineLocation);
+                                    Map<TaskGroupLocation, ExecutionState> taskStateMap =
+                                            new HashMap<>();
+                                    pipeline.getCoordinatorVertexList()
+                                            .forEach(
+                                                    coordinator -> {
+                                                        TaskGroupLocation taskGroupLocation =
+                                                                coordinator.getTaskGroupLocation();
+                                                        taskStateMap.put(
+                                                                taskGroupLocation,
+                                                                (ExecutionState)
+                                                                        runningJobStateIMap.get(
+                                                                                taskGroupLocation));
+                                                    });
+                                    pipeline.getPhysicalVertexList()
+                                            .forEach(
+                                                    task -> {
+                                                        TaskGroupLocation taskGroupLocation =
+                                                                task.getTaskGroupLocation();
+                                                        taskStateMap.put(
+                                                                taskGroupLocation,
+                                                                (ExecutionState)
+                                                                        runningJobStateIMap.get(
+                                                                                taskGroupLocation));
+                                                    });
+
+                                    PipelineStateData pipelineStateData =
+                                            new PipelineStateData(pipelineState, taskStateMap);
+                                    pipelineStateMapperMap.put(pipelineLocation, pipelineStateData);
+                                });
+            } catch (Exception e) {
+                logger.warning("get job pipeline state err", e);
+            }
+        }
         JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId);
         String jobName = jobMaster.getJobImmutableInformation().getJobName();
         long submitTime = jobMaster.getJobImmutableInformation().getCreateTime();