You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/03 09:43:10 UTC

[incubator-seatunnel] branch dev updated: [Improve] [Engine] Fix Engine Metrics will lose when Job be canceled. (#3797)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ca06ea173 [Improve] [Engine] Fix Engine Metrics will lose when Job be canceled. (#3797)
ca06ea173 is described below

commit ca06ea173cdd46b199fdac2f4043e83711c6f303
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jan 3 17:43:03 2023 +0800

    [Improve] [Engine] Fix Engine Metrics will lose when Job be canceled. (#3797)
---
 .../seatunnel/api/common/metrics/JobMetrics.java   |  14 +++
 .../engine/server/CoordinatorService.java          |  12 +--
 .../seatunnel/engine/server/SeaTunnelServer.java   |  11 +-
 .../seatunnel/engine/server/dag/DAGUtils.java      |  69 ++++++++++++
 .../engine/server/dag/physical/PhysicalPlan.java   |  14 ++-
 .../engine/server/master/JobHistoryService.java    |  14 +--
 .../seatunnel/engine/server/master/JobMaster.java  | 117 ++++++++-------------
 .../engine/server/metrics/JobMetricsUtil.java      |   8 +-
 .../server/service/slot/DefaultSlotService.java    |   7 +-
 9 files changed, 155 insertions(+), 111 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index a9f8c8054..7263a94ce 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -70,6 +71,19 @@ public final class JobMetrics implements Serializable {
         return new JobMetrics(metrics);
     }
 
+    public JobMetrics merge(JobMetrics jobMetrics) {
+        if (jobMetrics == null) {
+            return this;
+        }
+        Map<String, List<Measurement>> metricsMap = new HashMap<>();
+        metrics.forEach((key, value) -> metricsMap.put(key, new ArrayList<>(value)));
+        jobMetrics.metrics.forEach((key, value) -> metricsMap.merge(key, value, (v1, v2) -> {
+            v1.addAll(v2);
+            return v1;
+        }));
+        return new JobMetrics(metricsMap);
+    }
+
     /**
      * Returns all metrics present.
      */
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 086fa6783..d5d1fbf3c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -177,13 +177,12 @@ public class CoordinatorService {
             nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)
         );
 
-        List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry -> {
-            return CompletableFuture.runAsync(() -> {
+        List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry ->
+            CompletableFuture.runAsync(() -> {
                 logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey()));
                 restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue());
                 logger.info(String.format("restore job (%s) from master active switch finished", entry.getKey()));
-            }, executorService);
-        }).collect(Collectors.toList());
+            }, executorService)).collect(Collectors.toList());
 
         try {
             CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
@@ -206,6 +205,7 @@ public class CoordinatorService {
                 nodeEngine,
                 executorService,
                 getResourceManager(),
+                getJobHistoryService(),
                 runningJobStateIMap,
                 runningJobStateTimestampsIMap,
                 ownedSlotProfilesIMap,
@@ -327,6 +327,7 @@ public class CoordinatorService {
             this.nodeEngine,
             executorService,
             getResourceManager(),
+            getJobHistoryService(),
             runningJobStateIMap,
             runningJobStateTimestampsIMap,
             ownedSlotProfilesIMap, engineConfig);
@@ -356,7 +357,6 @@ public class CoordinatorService {
         // storage job state and metrics to HistoryStorage
         jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo());
         jobHistoryService.storeFinishedJobState(jobMaster);
-        jobHistoryService.storeFinishedJobMetrics(jobMaster);
         removeJobIMap(jobMaster);
         runningJobMasterMap.remove(jobId);
     }
@@ -425,7 +425,7 @@ public class CoordinatorService {
         }
         JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
         JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
-        return jobMetricsImap != null ? jobMetricsImap : jobMetrics;
+        return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
     }
 
     public JobDAGInfo getJobInfo(long jobId) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 1213103c4..c2a23aec1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -161,12 +161,12 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         if (isMasterNode()) {
             // The hazelcast operator request invocation will retry, We must wait enough time to wait the invocation return.
             String hazelcastInvocationMaxRetry = seaTunnelConfig.getHazelcastConfig().getProperty("hazelcast.invocation.max.retry.count");
-            int maxRetry = hazelcastInvocationMaxRetry == null ? 250 * 2 : Integer.valueOf(hazelcastInvocationMaxRetry) * 2;
+            int maxRetry = hazelcastInvocationMaxRetry == null ? 250 * 2 : Integer.parseInt(hazelcastInvocationMaxRetry) * 2;
 
             String hazelcastRetryPause =
                 seaTunnelConfig.getHazelcastConfig().getProperty("hazelcast.invocation.retry.pause.millis");
 
-            int retryPause = hazelcastRetryPause == null ? 500 : Integer.valueOf(hazelcastRetryPause);
+            int retryPause = hazelcastRetryPause == null ? 500 : Integer.parseInt(hazelcastRetryPause);
 
             while (!coordinatorService.isCoordinatorActive() && retryCount < maxRetry && isRunning) {
                 try {
@@ -208,10 +208,9 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     public boolean isMasterNode() {
         // must retry until the cluster have master node
         try {
-            return RetryUtils.retryWithException(() -> {
-                return nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress());
-            }, new RetryUtils.RetryMaterial(20, true,
-                exception -> exception instanceof NullPointerException && isRunning, 1000));
+            return RetryUtils.retryWithException(() -> nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()),
+                new RetryUtils.RetryMaterial(20, true,
+                    exception -> exception instanceof NullPointerException && isRunning, 1000));
         } catch (InterruptedException e) {
             LOGGER.info("master node check interrupted");
             return false;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
new file mode 100644
index 000000000..e0552ae20
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag;
+
+import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.Edge;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.VertexInfo;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
+import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DAGUtils {
+
+    public static JobDAGInfo getJobDAGInfo(LogicalDag logicalDag, JobImmutableInformation jobImmutableInformation, boolean isPhysicalDAGIInfo) {
+        List<Pipeline> pipelines = new ExecutionPlanGenerator(logicalDag, jobImmutableInformation).generate().getPipelines();
+        if (isPhysicalDAGIInfo) {
+            // Generate ExecutePlan DAG
+            Map<Integer, List<Edge>> pipelineWithEdges = new HashMap<>();
+            Map<Long, VertexInfo> vertexInfoMap = new HashMap<>();
+            pipelines.forEach(pipeline -> {
+                pipelineWithEdges.put(pipeline.getId(), pipeline.getEdges().stream()
+                    .map(e -> new Edge(e.getLeftVertexId(), e.getRightVertexId())).collect(Collectors.toList()));
+                pipeline.getVertexes().forEach((id, vertex) -> {
+                    vertexInfoMap.put(id, new VertexInfo(vertex.getVertexId(), ActionUtils.getActionType(vertex.getAction()), vertex.getAction().getName()));
+                });
+            });
+            return new JobDAGInfo(jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
+        } else {
+            // Generate LogicalPlan DAG
+            List<Edge> edges = logicalDag.getEdges().stream()
+                .map(e -> new Edge(e.getInputVertexId(), e.getTargetVertexId())).collect(Collectors.toList());
+
+            Map<Long, LogicalVertex> logicalVertexMap = logicalDag.getLogicalVertexMap();
+            Map<Long, VertexInfo> vertexInfoMap = logicalVertexMap.values().stream().map(v -> new VertexInfo(v.getVertexId(),
+                ActionUtils.getActionType(v.getAction()), v.getAction().getName())).collect(Collectors.toMap(VertexInfo::getVertexId, Function.identity()));
+
+            Map<Integer, List<Edge>> pipelineWithEdges = edges.stream().collect(Collectors.groupingBy(e -> {
+                LogicalVertex info = logicalVertexMap.get(e.getInputVertexId() != null ? e.getInputVertexId() : e.getTargetVertexId());
+                return pipelines.stream().filter(p -> p.getActions().containsKey(info.getAction().getId())).findFirst().get().getId();
+            }, Collectors.toList()));
+            return new JobDAGInfo(jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
+        }
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 6aff26532..411613d9a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -56,7 +56,7 @@ public class PhysicalPlan {
     private final IMap<Object, Object> runningJobStateIMap;
 
     /**
-     * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+     * Timestamps (in milliseconds) as returned by {@code System.currentTimeMillis()} when the
      * execution graph transitioned into a certain state. The index into this array is the ordinal
      * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
      * stateTimestamps[RUNNING.ordinal()]}.
@@ -149,7 +149,6 @@ public class PhysicalPlan {
                         cancelJob();
                     }
                     LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
-                    jobMaster.releasePipelineResource(subPlan);
                 } else if (PipelineStatus.FAILED.equals(pipelineState)) {
                     if (canRestorePipeline(subPlan)) {
                         subPlan.restorePipeline();
@@ -159,11 +158,9 @@ public class PhysicalPlan {
                     if (makeJobEndWhenPipelineEnded) {
                         cancelJob();
                     }
-                    jobMaster.releasePipelineResource(subPlan);
                     LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
                 }
-
-                notifyCheckpointManagerPipelineEnd(subPlan);
+                subPlanDone(subPlan);
 
                 if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
                     if (failedPipelineNum.get() > 0) {
@@ -182,8 +179,15 @@ public class PhysicalPlan {
         });
     }
 
+    private void subPlanDone(SubPlan subPlan) {
+        jobMaster.savePipelineMetricsToHistory(subPlan.getPipelineLocation());
+        jobMaster.releasePipelineResource(subPlan);
+        notifyCheckpointManagerPipelineEnd(subPlan);
+    }
+
     /**
      * only call when the pipeline will never restart
+     *
      * @param subPlan subPlan
      */
     private void notifyCheckpointManagerPipelineEnd(@NonNull SubPlan subPlan) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 5ca664c7d..4a0378104 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -18,14 +18,12 @@
 package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
-import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -40,7 +38,6 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -153,13 +150,10 @@ public class JobHistoryService {
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    public void storeFinishedJobMetrics(JobMaster jobMaster) {
-        List<RawJobMetrics> currJobMetrics = jobMaster.getCurrJobMetrics();
-        JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
-        Long jobId = jobMaster.getJobImmutableInformation().getJobId();
-        finishedJobMetricsImap.put(jobId, jobMetrics, 14, TimeUnit.DAYS);
-        //Clean TaskGroupContext for TaskExecutionServer
-        jobMaster.cleanTaskGroupContext();
+    public void storeFinishedPipelineMetrics(long jobId, JobMetrics metrics) {
+        finishedJobMetricsImap.computeIfAbsent(jobId, key -> JobMetrics.of(new HashMap<>()));
+        JobMetrics newMetrics = finishedJobMetricsImap.get(jobId).merge(metrics);
+        finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
     }
 
     private JobStateData toJobStateMapper(JobMaster jobMaster) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 34250fb48..3940e2744 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.master;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -31,31 +32,27 @@ import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
-import org.apache.seatunnel.engine.core.job.Edge;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.VertexInfo;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
-import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
-import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+import org.apache.seatunnel.engine.server.dag.DAGUtils;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
 import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
 import org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
 import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.cluster.Address;
@@ -67,21 +64,19 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
-public class JobMaster extends Thread {
+public class JobMaster {
     private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
 
     private PhysicalPlan physicalPlan;
@@ -95,6 +90,8 @@ public class JobMaster extends Thread {
 
     private final ResourceManager resourceManager;
 
+    private final JobHistoryService jobHistoryService;
+
     private CheckpointManager checkpointManager;
 
     private CompletableFuture<JobStatus> jobMasterCompleteFuture;
@@ -121,7 +118,7 @@ public class JobMaster extends Thread {
     private volatile boolean restore = false;
 
     // TODO add config to change value
-    private boolean isPhyicalDAGInfo = true;
+    private boolean isPhysicalDAGIInfo = true;
 
     private final EngineConfig engineConfig;
 
@@ -131,6 +128,7 @@ public class JobMaster extends Thread {
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
                      @NonNull ResourceManager resourceManager,
+                     @NonNull JobHistoryService jobHistoryService,
                      @NonNull IMap runningJobStateIMap,
                      @NonNull IMap runningJobStateTimestampsIMap,
                      @NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) {
@@ -141,6 +139,7 @@ public class JobMaster extends Thread {
             this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
         this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;
         this.resourceManager = resourceManager;
+        this.jobHistoryService = jobHistoryService;
         this.runningJobStateIMap = runningJobStateIMap;
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
         this.engineConfig = engineConfig;
@@ -248,37 +247,8 @@ public class JobMaster extends Thread {
     }
 
     public JobDAGInfo getJobDAGInfo() {
-        if (jobDAGInfo != null) {
-            return jobDAGInfo;
-        }
-        List<Pipeline> pipelines = new ExecutionPlanGenerator(this.logicalDag, this.getJobImmutableInformation()).generate().getPipelines();
-
-        if (isPhyicalDAGInfo) {
-            // Generate ExecutePlan DAG
-            Map<Integer, List<Edge>> pipelineWithEdges = new HashMap<>();
-            Map<Long, VertexInfo> vertexInfoMap = new HashMap<>();
-            pipelines.forEach(pipeline -> {
-                pipelineWithEdges.put(pipeline.getId(), pipeline.getEdges().stream()
-                    .map(e -> new Edge(e.getLeftVertexId(), e.getRightVertexId())).collect(Collectors.toList()));
-                pipeline.getVertexes().forEach((id, vertex) -> {
-                    vertexInfoMap.put(id, new VertexInfo(vertex.getVertexId(), ActionUtils.getActionType(vertex.getAction()), vertex.getAction().getName()));
-                });
-            });
-            jobDAGInfo = new JobDAGInfo(this.jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
-        } else {
-            // Generate LogicalPlan DAG
-            List<Edge> edges = this.logicalDag.getEdges().stream()
-                .map(e -> new Edge(e.getInputVertexId(), e.getTargetVertexId())).collect(Collectors.toList());
-
-            Map<Long, LogicalVertex> logicalVertexMap = this.logicalDag.getLogicalVertexMap();
-            Map<Long, VertexInfo> vertexInfoMap = logicalVertexMap.values().stream().map(v -> new VertexInfo(v.getVertexId(),
-                ActionUtils.getActionType(v.getAction()), v.getAction().getName())).collect(Collectors.toMap(VertexInfo::getVertexId, Function.identity()));
-
-            Map<Integer, List<Edge>> pipelineWithEdges = edges.stream().collect(Collectors.groupingBy(e -> {
-                LogicalVertex info = logicalVertexMap.get(e.getInputVertexId() != null ? e.getInputVertexId() : e.getTargetVertexId());
-                return pipelines.stream().filter(p -> p.getActions().containsKey(info.getAction().getId())).findFirst().get().getId();
-            }, Collectors.toList()));
-            jobDAGInfo = new JobDAGInfo(this.jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
+        if (jobDAGInfo == null) {
+            jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, isPhysicalDAGIInfo);
         }
         return jobDAGInfo;
     }
@@ -338,41 +308,46 @@ public class JobMaster extends Thread {
     }
 
     public List<RawJobMetrics> getCurrJobMetrics() {
+        return getCurrJobMetrics(ownedSlotProfilesIMap.values());
+    }
+
+    public List<RawJobMetrics> getCurrJobMetrics(Collection<Map<TaskGroupLocation, SlotProfile>> groupLocations) {
         List<RawJobMetrics> metrics = new ArrayList<>();
-        ownedSlotProfilesIMap.forEach((pipelineLocation, taskGroupLocationSlotProfileMap) -> {
-            taskGroupLocationSlotProfileMap.forEach((taskGroupLocation, slotProfile) -> {
+        for (Map<TaskGroupLocation, SlotProfile> groupLocation : groupLocations) {
+            groupLocation.forEach((taskGroupLocation, slotProfile) -> {
                 if (taskGroupLocation.getJobId() == this.getJobImmutableInformation().getJobId()) {
-                    Address worker = slotProfile.getWorker();
-                    InvocationFuture<Object> invoke = nodeEngine.getOperationService().createInvocationBuilder(
-                        SeaTunnelServer.SERVICE_NAME,
-                        new GetTaskGroupMetricsOperation(taskGroupLocation),
-                        worker).invoke();
                     try {
-                        RawJobMetrics rawJobMetrics = (RawJobMetrics) invoke.get();
+                        RawJobMetrics rawJobMetrics = (RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+                            new GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
                         metrics.add(rawJobMetrics);
                     } catch (Exception e) {
                         throw new SeaTunnelException(e.getMessage());
                     }
                 }
             });
-        });
+        }
         return metrics;
     }
 
-    public void cleanTaskGroupContext() {
-        ownedSlotProfilesIMap.forEach((pipelineLocation, taskGroupLocationSlotProfileMap) -> {
-            taskGroupLocationSlotProfileMap.forEach((taskGroupLocation, slotProfile) -> {
-                Address worker = slotProfile.getWorker();
-                InvocationFuture<Object> invoke = nodeEngine.getOperationService().createInvocationBuilder(
-                    SeaTunnelServer.SERVICE_NAME,
-                    new CleanTaskGroupContextOperation(taskGroupLocation),
-                    worker).invoke();
-                try {
-                    invoke.get();
-                } catch (Exception e) {
-                    throw new SeaTunnelException(e.getMessage());
-                }
-            });
+    public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) {
+        List<RawJobMetrics> currJobMetrics = this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
+        JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
+        long jobId = this.getJobImmutableInformation().getJobId();
+        synchronized (this) {
+            jobHistoryService.storeFinishedPipelineMetrics(jobId, jobMetrics);
+        }
+        //Clean TaskGroupContext for TaskExecutionServer
+        this.cleanTaskGroupContext(pipelineLocation);
+    }
+
+    private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
+        ownedSlotProfilesIMap.get(pipelineLocation).forEach((taskGroupLocation, slotProfile) -> {
+            try {
+                NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+                    new CleanTaskGroupContextOperation(taskGroupLocation), slotProfile.getWorker()).get();
+            } catch (Exception e) {
+                throw new SeaTunnelException(e.getMessage());
+            }
         });
     }
 
@@ -438,12 +413,8 @@ public class JobMaster extends Thread {
     }
 
     public void interrupt() {
-        try {
-            isRunning = false;
-            jobMasterCompleteFuture.cancel(true);
-        } finally {
-            super.interrupt();
-        }
+        isRunning = false;
+        jobMasterCompleteFuture.cancel(true);
     }
 
     public void markRestore() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
index f53f7fec6..a257c6e43 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
@@ -58,19 +58,15 @@ public final class JobMetricsUtil {
     }
 
     public static JobMetrics toJobMetrics(List<RawJobMetrics> rawJobMetrics) {
-        JobMetricsConsumer consumer = null;
+        JobMetricsConsumer consumer = new JobMetricsConsumer();
         for (RawJobMetrics metrics : rawJobMetrics) {
             if (metrics.getBlob() == null) {
                 continue;
             }
-            if (consumer == null) {
-                consumer = new JobMetricsConsumer();
-            }
             consumer.timestamp = metrics.getTimestamp();
             MetricsCompressor.extractMetrics(metrics.getBlob(), consumer);
         }
-        return consumer == null ? JobMetrics.empty() : JobMetrics.of(consumer.metrics);
-
+        return JobMetrics.of(consumer.metrics);
     }
 
     private static class JobMetricsConsumer implements MetricConsumer {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 75b692302..23b308a6e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.server.service.slot;
 
 import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
 import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
@@ -27,11 +26,11 @@ import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngineImpl;
-import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
@@ -214,9 +213,7 @@ public class DefaultSlotService implements SlotService {
     }
 
     public <E> InvocationFuture<E> sendToMaster(Operation operation) {
-        InvocationBuilder invocationBuilder = nodeEngine.getOperationService()
-            .createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
-        return invocationBuilder.invoke();
+        return NodeEngineUtil.sendOperationToMasterNode(nodeEngine, operation);
     }
 
 }