You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/03 09:43:10 UTC
[incubator-seatunnel] branch dev updated: [Improve] [Engine] Fix Engine Metrics will lose when Job be canceled. (#3797)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ca06ea173 [Improve] [Engine] Fix Engine Metrics will lose when Job be canceled. (#3797)
ca06ea173 is described below
commit ca06ea173cdd46b199fdac2f4043e83711c6f303
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jan 3 17:43:03 2023 +0800
[Improve] [Engine] Fix Engine Metrics will lose when Job be canceled. (#3797)
---
.../seatunnel/api/common/metrics/JobMetrics.java | 14 +++
.../engine/server/CoordinatorService.java | 12 +--
.../seatunnel/engine/server/SeaTunnelServer.java | 11 +-
.../seatunnel/engine/server/dag/DAGUtils.java | 69 ++++++++++++
.../engine/server/dag/physical/PhysicalPlan.java | 14 ++-
.../engine/server/master/JobHistoryService.java | 14 +--
.../seatunnel/engine/server/master/JobMaster.java | 117 ++++++++-------------
.../engine/server/metrics/JobMetricsUtil.java | 8 +-
.../server/service/slot/DefaultSlotService.java | 7 +-
9 files changed, 155 insertions(+), 111 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index a9f8c8054..7263a94ce 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -70,6 +71,19 @@ public final class JobMetrics implements Serializable {
return new JobMetrics(metrics);
}
+ public JobMetrics merge(JobMetrics jobMetrics) {
+ if (jobMetrics == null) {
+ return this;
+ }
+ Map<String, List<Measurement>> metricsMap = new HashMap<>();
+ metrics.forEach((key, value) -> metricsMap.put(key, new ArrayList<>(value)));
+ jobMetrics.metrics.forEach((key, value) -> metricsMap.merge(key, value, (v1, v2) -> {
+ v1.addAll(v2);
+ return v1;
+ }));
+ return new JobMetrics(metricsMap);
+ }
+
/**
* Returns all metrics present.
*/
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 086fa6783..d5d1fbf3c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -177,13 +177,12 @@ public class CoordinatorService {
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)
);
- List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry -> {
- return CompletableFuture.runAsync(() -> {
+ List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry ->
+ CompletableFuture.runAsync(() -> {
logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey()));
restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue());
logger.info(String.format("restore job (%s) from master active switch finished", entry.getKey()));
- }, executorService);
- }).collect(Collectors.toList());
+ }, executorService)).collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
@@ -206,6 +205,7 @@ public class CoordinatorService {
nodeEngine,
executorService,
getResourceManager(),
+ getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
@@ -327,6 +327,7 @@ public class CoordinatorService {
this.nodeEngine,
executorService,
getResourceManager(),
+ getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap, engineConfig);
@@ -356,7 +357,6 @@ public class CoordinatorService {
// storage job state and metrics to HistoryStorage
jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo());
jobHistoryService.storeFinishedJobState(jobMaster);
- jobHistoryService.storeFinishedJobMetrics(jobMaster);
removeJobIMap(jobMaster);
runningJobMasterMap.remove(jobId);
}
@@ -425,7 +425,7 @@ public class CoordinatorService {
}
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
- return jobMetricsImap != null ? jobMetricsImap : jobMetrics;
+ return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
}
public JobDAGInfo getJobInfo(long jobId) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 1213103c4..c2a23aec1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -161,12 +161,12 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
if (isMasterNode()) {
// The hazelcast operator request invocation will retry, We must wait enough time to wait the invocation return.
String hazelcastInvocationMaxRetry = seaTunnelConfig.getHazelcastConfig().getProperty("hazelcast.invocation.max.retry.count");
- int maxRetry = hazelcastInvocationMaxRetry == null ? 250 * 2 : Integer.valueOf(hazelcastInvocationMaxRetry) * 2;
+ int maxRetry = hazelcastInvocationMaxRetry == null ? 250 * 2 : Integer.parseInt(hazelcastInvocationMaxRetry) * 2;
String hazelcastRetryPause =
seaTunnelConfig.getHazelcastConfig().getProperty("hazelcast.invocation.retry.pause.millis");
- int retryPause = hazelcastRetryPause == null ? 500 : Integer.valueOf(hazelcastRetryPause);
+ int retryPause = hazelcastRetryPause == null ? 500 : Integer.parseInt(hazelcastRetryPause);
while (!coordinatorService.isCoordinatorActive() && retryCount < maxRetry && isRunning) {
try {
@@ -208,10 +208,9 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
public boolean isMasterNode() {
// must retry until the cluster have master node
try {
- return RetryUtils.retryWithException(() -> {
- return nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress());
- }, new RetryUtils.RetryMaterial(20, true,
- exception -> exception instanceof NullPointerException && isRunning, 1000));
+ return RetryUtils.retryWithException(() -> nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()),
+ new RetryUtils.RetryMaterial(20, true,
+ exception -> exception instanceof NullPointerException && isRunning, 1000));
} catch (InterruptedException e) {
LOGGER.info("master node check interrupted");
return false;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
new file mode 100644
index 000000000..e0552ae20
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag;
+
+import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.Edge;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.VertexInfo;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
+import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DAGUtils {
+
+ public static JobDAGInfo getJobDAGInfo(LogicalDag logicalDag, JobImmutableInformation jobImmutableInformation, boolean isPhysicalDAGIInfo) {
+ List<Pipeline> pipelines = new ExecutionPlanGenerator(logicalDag, jobImmutableInformation).generate().getPipelines();
+ if (isPhysicalDAGIInfo) {
+ // Generate ExecutePlan DAG
+ Map<Integer, List<Edge>> pipelineWithEdges = new HashMap<>();
+ Map<Long, VertexInfo> vertexInfoMap = new HashMap<>();
+ pipelines.forEach(pipeline -> {
+ pipelineWithEdges.put(pipeline.getId(), pipeline.getEdges().stream()
+ .map(e -> new Edge(e.getLeftVertexId(), e.getRightVertexId())).collect(Collectors.toList()));
+ pipeline.getVertexes().forEach((id, vertex) -> {
+ vertexInfoMap.put(id, new VertexInfo(vertex.getVertexId(), ActionUtils.getActionType(vertex.getAction()), vertex.getAction().getName()));
+ });
+ });
+ return new JobDAGInfo(jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
+ } else {
+ // Generate LogicalPlan DAG
+ List<Edge> edges = logicalDag.getEdges().stream()
+ .map(e -> new Edge(e.getInputVertexId(), e.getTargetVertexId())).collect(Collectors.toList());
+
+ Map<Long, LogicalVertex> logicalVertexMap = logicalDag.getLogicalVertexMap();
+ Map<Long, VertexInfo> vertexInfoMap = logicalVertexMap.values().stream().map(v -> new VertexInfo(v.getVertexId(),
+ ActionUtils.getActionType(v.getAction()), v.getAction().getName())).collect(Collectors.toMap(VertexInfo::getVertexId, Function.identity()));
+
+ Map<Integer, List<Edge>> pipelineWithEdges = edges.stream().collect(Collectors.groupingBy(e -> {
+ LogicalVertex info = logicalVertexMap.get(e.getInputVertexId() != null ? e.getInputVertexId() : e.getTargetVertexId());
+ return pipelines.stream().filter(p -> p.getActions().containsKey(info.getAction().getId())).findFirst().get().getId();
+ }, Collectors.toList()));
+ return new JobDAGInfo(jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
+ }
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 6aff26532..411613d9a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -56,7 +56,7 @@ public class PhysicalPlan {
private final IMap<Object, Object> runningJobStateIMap;
/**
- * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+ * Timestamps (in milliseconds) as returned by {@code System.currentTimeMillis()} when the
* execution graph transitioned into a certain state. The index into this array is the ordinal
* of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
* stateTimestamps[RUNNING.ordinal()]}.
@@ -149,7 +149,6 @@ public class PhysicalPlan {
cancelJob();
}
LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
- jobMaster.releasePipelineResource(subPlan);
} else if (PipelineStatus.FAILED.equals(pipelineState)) {
if (canRestorePipeline(subPlan)) {
subPlan.restorePipeline();
@@ -159,11 +158,9 @@ public class PhysicalPlan {
if (makeJobEndWhenPipelineEnded) {
cancelJob();
}
- jobMaster.releasePipelineResource(subPlan);
LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
}
-
- notifyCheckpointManagerPipelineEnd(subPlan);
+ subPlanDone(subPlan);
if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
if (failedPipelineNum.get() > 0) {
@@ -182,8 +179,15 @@ public class PhysicalPlan {
});
}
+ private void subPlanDone(SubPlan subPlan) {
+ jobMaster.savePipelineMetricsToHistory(subPlan.getPipelineLocation());
+ jobMaster.releasePipelineResource(subPlan);
+ notifyCheckpointManagerPipelineEnd(subPlan);
+ }
+
/**
* only call when the pipeline will never restart
+ *
* @param subPlan subPlan
*/
private void notifyCheckpointManagerPipelineEnd(@NonNull SubPlan subPlan) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 5ca664c7d..4a0378104 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -18,14 +18,12 @@
package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
-import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -40,7 +38,6 @@ import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -153,13 +150,10 @@ public class JobHistoryService {
}
@SuppressWarnings("checkstyle:MagicNumber")
- public void storeFinishedJobMetrics(JobMaster jobMaster) {
- List<RawJobMetrics> currJobMetrics = jobMaster.getCurrJobMetrics();
- JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
- Long jobId = jobMaster.getJobImmutableInformation().getJobId();
- finishedJobMetricsImap.put(jobId, jobMetrics, 14, TimeUnit.DAYS);
- //Clean TaskGroupContext for TaskExecutionServer
- jobMaster.cleanTaskGroupContext();
+ public void storeFinishedPipelineMetrics(long jobId, JobMetrics metrics) {
+ finishedJobMetricsImap.computeIfAbsent(jobId, key -> JobMetrics.of(new HashMap<>()));
+ JobMetrics newMetrics = finishedJobMetricsImap.get(jobId).merge(metrics);
+ finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
}
private JobStateData toJobStateMapper(JobMaster jobMaster) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 34250fb48..3940e2744 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.master;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -31,31 +32,27 @@ import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
-import org.apache.seatunnel.engine.core.job.Edge;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.job.VertexInfo;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
-import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
-import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+import org.apache.seatunnel.engine.server.dag.DAGUtils;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
import org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.google.common.collect.Lists;
import com.hazelcast.cluster.Address;
@@ -67,21 +64,19 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-public class JobMaster extends Thread {
+public class JobMaster {
private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
private PhysicalPlan physicalPlan;
@@ -95,6 +90,8 @@ public class JobMaster extends Thread {
private final ResourceManager resourceManager;
+ private final JobHistoryService jobHistoryService;
+
private CheckpointManager checkpointManager;
private CompletableFuture<JobStatus> jobMasterCompleteFuture;
@@ -121,7 +118,7 @@ public class JobMaster extends Thread {
private volatile boolean restore = false;
// TODO add config to change value
- private boolean isPhyicalDAGInfo = true;
+ private boolean isPhysicalDAGIInfo = true;
private final EngineConfig engineConfig;
@@ -131,6 +128,7 @@ public class JobMaster extends Thread {
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@NonNull ResourceManager resourceManager,
+ @NonNull JobHistoryService jobHistoryService,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap,
@NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) {
@@ -141,6 +139,7 @@ public class JobMaster extends Thread {
this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;
this.resourceManager = resourceManager;
+ this.jobHistoryService = jobHistoryService;
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.engineConfig = engineConfig;
@@ -248,37 +247,8 @@ public class JobMaster extends Thread {
}
public JobDAGInfo getJobDAGInfo() {
- if (jobDAGInfo != null) {
- return jobDAGInfo;
- }
- List<Pipeline> pipelines = new ExecutionPlanGenerator(this.logicalDag, this.getJobImmutableInformation()).generate().getPipelines();
-
- if (isPhyicalDAGInfo) {
- // Generate ExecutePlan DAG
- Map<Integer, List<Edge>> pipelineWithEdges = new HashMap<>();
- Map<Long, VertexInfo> vertexInfoMap = new HashMap<>();
- pipelines.forEach(pipeline -> {
- pipelineWithEdges.put(pipeline.getId(), pipeline.getEdges().stream()
- .map(e -> new Edge(e.getLeftVertexId(), e.getRightVertexId())).collect(Collectors.toList()));
- pipeline.getVertexes().forEach((id, vertex) -> {
- vertexInfoMap.put(id, new VertexInfo(vertex.getVertexId(), ActionUtils.getActionType(vertex.getAction()), vertex.getAction().getName()));
- });
- });
- jobDAGInfo = new JobDAGInfo(this.jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
- } else {
- // Generate LogicalPlan DAG
- List<Edge> edges = this.logicalDag.getEdges().stream()
- .map(e -> new Edge(e.getInputVertexId(), e.getTargetVertexId())).collect(Collectors.toList());
-
- Map<Long, LogicalVertex> logicalVertexMap = this.logicalDag.getLogicalVertexMap();
- Map<Long, VertexInfo> vertexInfoMap = logicalVertexMap.values().stream().map(v -> new VertexInfo(v.getVertexId(),
- ActionUtils.getActionType(v.getAction()), v.getAction().getName())).collect(Collectors.toMap(VertexInfo::getVertexId, Function.identity()));
-
- Map<Integer, List<Edge>> pipelineWithEdges = edges.stream().collect(Collectors.groupingBy(e -> {
- LogicalVertex info = logicalVertexMap.get(e.getInputVertexId() != null ? e.getInputVertexId() : e.getTargetVertexId());
- return pipelines.stream().filter(p -> p.getActions().containsKey(info.getAction().getId())).findFirst().get().getId();
- }, Collectors.toList()));
- jobDAGInfo = new JobDAGInfo(this.jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap);
+ if (jobDAGInfo == null) {
+ jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, isPhysicalDAGIInfo);
}
return jobDAGInfo;
}
@@ -338,41 +308,46 @@ public class JobMaster extends Thread {
}
public List<RawJobMetrics> getCurrJobMetrics() {
+ return getCurrJobMetrics(ownedSlotProfilesIMap.values());
+ }
+
+ public List<RawJobMetrics> getCurrJobMetrics(Collection<Map<TaskGroupLocation, SlotProfile>> groupLocations) {
List<RawJobMetrics> metrics = new ArrayList<>();
- ownedSlotProfilesIMap.forEach((pipelineLocation, taskGroupLocationSlotProfileMap) -> {
- taskGroupLocationSlotProfileMap.forEach((taskGroupLocation, slotProfile) -> {
+ for (Map<TaskGroupLocation, SlotProfile> groupLocation : groupLocations) {
+ groupLocation.forEach((taskGroupLocation, slotProfile) -> {
if (taskGroupLocation.getJobId() == this.getJobImmutableInformation().getJobId()) {
- Address worker = slotProfile.getWorker();
- InvocationFuture<Object> invoke = nodeEngine.getOperationService().createInvocationBuilder(
- SeaTunnelServer.SERVICE_NAME,
- new GetTaskGroupMetricsOperation(taskGroupLocation),
- worker).invoke();
try {
- RawJobMetrics rawJobMetrics = (RawJobMetrics) invoke.get();
+ RawJobMetrics rawJobMetrics = (RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+ new GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
metrics.add(rawJobMetrics);
} catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
}
}
});
- });
+ }
return metrics;
}
- public void cleanTaskGroupContext() {
- ownedSlotProfilesIMap.forEach((pipelineLocation, taskGroupLocationSlotProfileMap) -> {
- taskGroupLocationSlotProfileMap.forEach((taskGroupLocation, slotProfile) -> {
- Address worker = slotProfile.getWorker();
- InvocationFuture<Object> invoke = nodeEngine.getOperationService().createInvocationBuilder(
- SeaTunnelServer.SERVICE_NAME,
- new CleanTaskGroupContextOperation(taskGroupLocation),
- worker).invoke();
- try {
- invoke.get();
- } catch (Exception e) {
- throw new SeaTunnelException(e.getMessage());
- }
- });
+ public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) {
+ List<RawJobMetrics> currJobMetrics = this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
+ JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
+ long jobId = this.getJobImmutableInformation().getJobId();
+ synchronized (this) {
+ jobHistoryService.storeFinishedPipelineMetrics(jobId, jobMetrics);
+ }
+ //Clean TaskGroupContext for TaskExecutionServer
+ this.cleanTaskGroupContext(pipelineLocation);
+ }
+
+ private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
+ ownedSlotProfilesIMap.get(pipelineLocation).forEach((taskGroupLocation, slotProfile) -> {
+ try {
+ NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+ new CleanTaskGroupContextOperation(taskGroupLocation), slotProfile.getWorker()).get();
+ } catch (Exception e) {
+ throw new SeaTunnelException(e.getMessage());
+ }
});
}
@@ -438,12 +413,8 @@ public class JobMaster extends Thread {
}
public void interrupt() {
- try {
- isRunning = false;
- jobMasterCompleteFuture.cancel(true);
- } finally {
- super.interrupt();
- }
+ isRunning = false;
+ jobMasterCompleteFuture.cancel(true);
}
public void markRestore() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
index f53f7fec6..a257c6e43 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
@@ -58,19 +58,15 @@ public final class JobMetricsUtil {
}
public static JobMetrics toJobMetrics(List<RawJobMetrics> rawJobMetrics) {
- JobMetricsConsumer consumer = null;
+ JobMetricsConsumer consumer = new JobMetricsConsumer();
for (RawJobMetrics metrics : rawJobMetrics) {
if (metrics.getBlob() == null) {
continue;
}
- if (consumer == null) {
- consumer = new JobMetricsConsumer();
- }
consumer.timestamp = metrics.getTimestamp();
MetricsCompressor.extractMetrics(metrics.getBlob(), consumer);
}
- return consumer == null ? JobMetrics.empty() : JobMetrics.of(consumer.metrics);
-
+ return JobMetrics.of(consumer.metrics);
}
private static class JobMetricsConsumer implements MetricConsumer {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 75b692302..23b308a6e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.server.service.slot;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
@@ -27,11 +26,11 @@ import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngineImpl;
-import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -214,9 +213,7 @@ public class DefaultSlotService implements SlotService {
}
public <E> InvocationFuture<E> sendToMaster(Operation operation) {
- InvocationBuilder invocationBuilder = nodeEngine.getOperationService()
- .createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
- return invocationBuilder.invoke();
+ return NodeEngineUtil.sendOperationToMasterNode(nodeEngine, operation);
}
}