You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/30 07:52:33 UTC
[seatunnel] branch dev updated: [Improve][Zeta] Speed up listAllJob function (#4852)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 136431498 [Improve][Zeta] Speed up listAllJob function (#4852)
136431498 is described below
commit 13643149885db942a28999316136614400bd492a
Author: Jia Fan <fa...@qq.com>
AuthorDate: Tue May 30 15:52:26 2023 +0800
[Improve][Zeta] Speed up listAllJob function (#4852)
---
.../engine/server/master/JobHistoryService.java | 91 ++++++++++++----------
1 file changed, 50 insertions(+), 41 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 0e7bbd5bc..12dcae40c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -102,7 +102,8 @@ public class JobHistoryService {
public String listAllJob() {
List<JobStatusData> status = new ArrayList<>();
Stream.concat(
- runningJobMasterMap.values().stream().map(this::toJobStateMapper),
+ runningJobMasterMap.values().stream()
+ .map(master -> toJobStateMapper(master, true)),
finishedJobStateImap.values().stream())
.forEach(
jobState -> {
@@ -126,7 +127,7 @@ public class JobHistoryService {
// Get detailed status of a single job
public JobState getJobDetailState(Long jobId) {
return runningJobMasterMap.containsKey(jobId)
- ? toJobStateMapper(runningJobMasterMap.get(jobId))
+ ? toJobStateMapper(runningJobMasterMap.get(jobId), false)
: finishedJobStateImap.getOrDefault(jobId, null);
}
@@ -158,7 +159,7 @@ public class JobHistoryService {
@SuppressWarnings("checkstyle:MagicNumber")
public void storeFinishedJobState(JobMaster jobMaster) {
- JobState jobState = toJobStateMapper(jobMaster);
+ JobState jobState = toJobStateMapper(jobMaster, false);
jobState.setFinishTime(System.currentTimeMillis());
finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
}
@@ -170,47 +171,55 @@ public class JobHistoryService {
finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
}
- private JobState toJobStateMapper(JobMaster jobMaster) {
+ private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) {
Long jobId = jobMaster.getJobImmutableInformation().getJobId();
Map<PipelineLocation, PipelineStateData> pipelineStateMapperMap = new HashMap<>();
-
- jobMaster
- .getPhysicalPlan()
- .getPipelineList()
- .forEach(
- pipeline -> {
- PipelineLocation pipelineLocation = pipeline.getPipelineLocation();
- PipelineStatus pipelineState =
- (PipelineStatus) runningJobStateIMap.get(pipelineLocation);
- Map<TaskGroupLocation, ExecutionState> taskStateMap = new HashMap<>();
- pipeline.getCoordinatorVertexList()
- .forEach(
- coordinator -> {
- TaskGroupLocation taskGroupLocation =
- coordinator.getTaskGroupLocation();
- taskStateMap.put(
- taskGroupLocation,
- (ExecutionState)
- runningJobStateIMap.get(
- taskGroupLocation));
- });
- pipeline.getPhysicalVertexList()
- .forEach(
- task -> {
- TaskGroupLocation taskGroupLocation =
- task.getTaskGroupLocation();
- taskStateMap.put(
- taskGroupLocation,
- (ExecutionState)
- runningJobStateIMap.get(
- taskGroupLocation));
- });
-
- PipelineStateData pipelineStateData =
- new PipelineStateData(pipelineState, taskStateMap);
- pipelineStateMapperMap.put(pipelineLocation, pipelineStateData);
- });
+ if (!simple) {
+ try {
+ jobMaster
+ .getPhysicalPlan()
+ .getPipelineList()
+ .forEach(
+ pipeline -> {
+ PipelineLocation pipelineLocation =
+ pipeline.getPipelineLocation();
+ PipelineStatus pipelineState =
+ (PipelineStatus)
+ runningJobStateIMap.get(pipelineLocation);
+ Map<TaskGroupLocation, ExecutionState> taskStateMap =
+ new HashMap<>();
+ pipeline.getCoordinatorVertexList()
+ .forEach(
+ coordinator -> {
+ TaskGroupLocation taskGroupLocation =
+ coordinator.getTaskGroupLocation();
+ taskStateMap.put(
+ taskGroupLocation,
+ (ExecutionState)
+ runningJobStateIMap.get(
+ taskGroupLocation));
+ });
+ pipeline.getPhysicalVertexList()
+ .forEach(
+ task -> {
+ TaskGroupLocation taskGroupLocation =
+ task.getTaskGroupLocation();
+ taskStateMap.put(
+ taskGroupLocation,
+ (ExecutionState)
+ runningJobStateIMap.get(
+ taskGroupLocation));
+ });
+
+ PipelineStateData pipelineStateData =
+ new PipelineStateData(pipelineState, taskStateMap);
+ pipelineStateMapperMap.put(pipelineLocation, pipelineStateData);
+ });
+ } catch (Exception e) {
+ logger.warning("get job pipeline state err", e);
+ }
+ }
JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId);
String jobName = jobMaster.getJobImmutableInformation().getJobName();
long submitTime = jobMaster.getJobImmutableInformation().getCreateTime();