You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/15 10:47:42 UTC
[incubator-seatunnel] branch st-engine updated: [Feature][ST-Engine] Put Job Status in IMap (#2699)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 651c2460d [Feature][ST-Engine] Put Job Status in IMap (#2699)
651c2460d is described below
commit 651c2460d1b9a5a56ffc1d689081008b0f4f7330
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Sep 15 18:47:35 2022 +0800
[Feature][ST-Engine] Put Job Status in IMap (#2699)
---
.../local/LocalTxtTransactionStateFileWriter.java | 2 +-
.../test/resources/batch_fakesource_to_file.conf | 2 +-
.../batch_fakesource_to_file_complex.conf | 2 +-
.../streaming_fakesource_to_file_complex.conf | 2 +-
.../test/resources/batch_fakesource_to_file.conf | 2 +-
.../batch_fakesource_to_file_complex.conf | 2 +-
.../src/test/resources/client_test.conf | 2 +-
.../seatunnel/engine/core/job/JobStatus.java | 3 +
.../seatunnel/engine/core/job/RunningJobInfo.java | 61 +++++
.../core/serializable/JobDataSerializerHook.java | 5 +
.../seatunnel/engine/server/SeaTunnelServer.java | 185 ++++++++++++--
.../engine/server/TaskExecutionService.java | 34 ++-
.../engine/server/dag/physical/PhysicalPlan.java | 219 +++++++++--------
.../server/dag/physical/PhysicalPlanGenerator.java | 39 ++-
.../engine/server/dag/physical/PhysicalVertex.java | 219 +++++++++--------
.../server/dag/physical/PipelineLocation.java | 30 +++
.../engine/server/dag/physical/PlanUtils.java | 30 ++-
.../engine/server/dag/physical/SubPlan.java | 266 ++++++++++++++-------
.../seatunnel/engine/server/master/JobMaster.java | 67 ++++--
.../server/operation/GetJobStatusOperation.java | 13 +-
.../server/scheduler/PipelineBaseScheduler.java | 106 ++++----
.../engine/server/AbstractSeaTunnelServerTest.java | 5 +-
.../server/checkpoint/CheckpointPlanTest.java | 10 +-
.../seatunnel/engine/server/dag/TaskTest.java | 9 +-
.../engine/server/master/JobMasterTest.java | 88 +++++++
25 files changed, 995 insertions(+), 408 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java
index d04939a70..a5be3aa2a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java
@@ -112,7 +112,7 @@ public class LocalTxtTransactionStateFileWriter extends AbstractTransactionState
FileUtils.createFile(filePath);
fileOutputStream = new FileOutputStream(new File(filePath));
beingWrittenOutputStream.put(filePath, fileOutputStream);
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.error("can not get output file stream");
throw new RuntimeException(e);
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 1e49074ba..3cfb808b7 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -41,7 +41,7 @@ transform {
sink {
LocalFile {
- path="file:///tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index d946739b6..2776e3fd1 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -47,7 +47,7 @@ transform {
sink {
LocalFile {
- path="file:///tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 0a52e7a0d..1fec906eb 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -47,7 +47,7 @@ transform {
sink {
LocalFile {
- path="file:///tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
index 7dc125e77..1979a6fc4 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
@@ -40,7 +40,7 @@ transform {
sink {
LocalFile {
- path="file:///tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
index 258c32352..575ac8b56 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -46,7 +46,7 @@ transform {
sink {
LocalFile {
- path="file:///tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index e2634dfa8..c5d51fb17 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -46,7 +46,7 @@ transform {
sink {
LocalFile {
- path = "file:///tmp/hive/warehouse/test2"
+ path = "/tmp/hive/warehouse/test2"
field_delimiter = "\t"
row_delimiter = "\n"
partition_by = ["age"]
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
index d0c9fd963..f9dbfb4c6 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -29,6 +29,9 @@ public enum JobStatus {
/** Job is newly created, no task has started to run. */
CREATED(EndState.NOT_END),
+ /** Job is begin schedule but some task not deploy complete. */
+ SCHEDULED(EndState.NOT_END),
+
/** Some tasks are scheduled or running, some may be pending, some may be finished. */
RUNNING(EndState.NOT_END),
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/RunningJobInfo.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/RunningJobInfo.java
new file mode 100644
index 000000000..00e7a5dd5
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/RunningJobInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.IOException;
+
+@AllArgsConstructor
+@Data
+public class RunningJobInfo implements IdentifiedDataSerializable {
+ private Long initializationTimestamp;
+
+ private com.hazelcast.internal.serialization.Data jobImmutableInformation;
+
+ public RunningJobInfo() {}
+
+ @Override
+ public int getFactoryId() {
+ return JobDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return JobDataSerializerHook.RUNNING_JOB_INFO;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeLong(initializationTimestamp);
+ IOUtil.writeData(out, jobImmutableInformation);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ initializationTimestamp = in.readLong();
+ jobImmutableInformation = IOUtil.readData(in);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index 1ff1c3288..6fb2da54a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -57,6 +58,8 @@ public final class JobDataSerializerHook implements DataSerializerHook {
*/
public static final int JOB_IMMUTABLE_INFORMATION = 3;
+ public static final int RUNNING_JOB_INFO = 4;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID
@@ -85,6 +88,8 @@ public final class JobDataSerializerHook implements DataSerializerHook {
return new LogicalEdge();
case JOB_IMMUTABLE_INFORMATION:
return new JobImmutableInformation();
+ case RUNNING_JOB_INFO:
+ return new RunningJobInfo();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index e1632ae8f..21ce26577 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -23,13 +23,16 @@ import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
@@ -43,19 +46,24 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import lombok.NonNull;
-import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
@@ -73,12 +81,50 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private final SeaTunnelConfig seaTunnelConfig;
+ /**
+ * IMap key is jobId and value is a {@link RunningJobInfo}
+ * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
+ */
+ private IMap<Long, RunningJobInfo> runningJobInfoIMap;
+
+ /**
+ * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+ * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+ * <p>
+ * The value of IMap is one of {@link JobStatus} {@link org.apache.seatunnel.engine.core.job.PipelineState}
+ * {@link org.apache.seatunnel.engine.server.execution.ExecutionState}
+ * <p>
+ * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active
+ */
+ IMap<Object, Object> runningJobStateIMap;
+
+ /**
+ * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+ * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+ * <p>
+ * The value of IMap is one of {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps
+ * {@link org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps
+ * {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
+ * <p>
+ * This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master node active
+ */
+ IMap<Object, Long[]> runningJobStateTimestampsIMap;
+
/**
* key: job id;
* <br> value: job master;
*/
private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
+ /**
+ * IMap key is {@link PipelineLocation}
+ * <p>
+ * The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
+ * <p>
+ * This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node active
+ */
+ private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
this.logger = node.getLogger(getClass());
this.liveOperationRegistry = new LiveOperationRegistry();
@@ -109,6 +155,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return runningJobMasterMap.get(jobId);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
@Override
public void init(NodeEngine engine, Properties hzProperties) {
this.nodeEngine = (NodeEngineImpl) engine;
@@ -118,6 +165,14 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
);
taskExecutionService.start();
getSlotService();
+
+ runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
+ runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+ runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
+ ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+
+ ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ service.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
}
@Override
@@ -190,12 +245,19 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
*/
public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
- JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService, getResourceManager());
+ JobMaster jobMaster = new JobMaster(jobImmutableInformation,
+ this.nodeEngine,
+ executorService,
+ getResourceManager(),
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap,
+ ownedSlotProfilesIMap);
executorService.submit(() -> {
try {
- jobMaster.init();
- jobMaster.getPhysicalPlan().initStateFuture();
+ runningJobInfoIMap.put(jobId, new RunningJobInfo(System.currentTimeMillis(), jobImmutableInformation));
runningJobMasterMap.put(jobId, jobMaster);
+ jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
+ jobMaster.getPhysicalPlan().initStateFuture();
} catch (Throwable e) {
LOGGER.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
voidCompletableFuture.completeExceptionally(e);
@@ -207,12 +269,49 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
try {
jobMaster.run();
} finally {
+ // storage job state info to HistoryStorage
+ removeJobIMap(jobMaster);
runningJobMasterMap.remove(jobId);
}
});
return new PassiveCompletableFuture(voidCompletableFuture);
}
+ private void removeJobIMap(JobMaster jobMaster) {
+ Long jobId = jobMaster.getJobImmutableInformation().getJobId();
+ runningJobStateTimestampsIMap.remove(jobId);
+
+ jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+ runningJobStateIMap.remove(pipeline.getPipelineLocation());
+ runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
+ pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+ runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
+ runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
+ });
+
+ pipeline.getPhysicalVertexList().forEach(task -> {
+ runningJobStateIMap.remove(task.getTaskGroupLocation());
+ runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
+ });
+ });
+
+ // These should be deleted at the end. On the new master node
+ // 1. If runningJobStateIMap.get(jobId) == null and runningJobInfoIMap.get(jobId) != null. We will do
+ // runningJobInfoIMap.remove(jobId)
+ //
+ // 2. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus End State. We need new a
+ // JobMaster and generate PhysicalPlan again and then try to remove all of PipelineLocation and
+ // TaskGroupLocation key in the runningJobStateIMap.
+ //
+ // 3. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus.SCHEDULED. We need cancel the job
+ // and then call submitJob(long jobId, Data jobImmutableInformation) to resubmit it.
+ //
+ // 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING or RUNNING. We need recover the JobMaster
+ // from runningJobStateIMap and then waiting for it complete.
+ runningJobStateIMap.remove(jobId);
+ runningJobInfoIMap.remove(jobId);
+ }
+
public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
@@ -245,34 +344,47 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
// TODO Get Job Status from JobHistoryStorage
return JobStatus.FINISHED;
}
- return runningJobMaster.getJobStatus();
+ // This method is called by operation and in the runningJobMaster.getJobStatus() we will get data from IMap.
+ // It will cause an error "Waiting for response on this thread is illegal". To solve it we need put
+ // runningJobMaster.getJobStatus() in another thread.
+ CompletableFuture<JobStatus> future = CompletableFuture.supplyAsync(() -> {
+ return runningJobMaster.getJobStatus();
+ }, executorService);
+
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {
Address lostAddress = event.getMember().getAddress();
runningJobMasterMap.forEach((aLong, jobMaster) -> {
jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
- ArrayList<PhysicalVertex> allVertex = new ArrayList<>();
- allVertex.addAll(subPlan.getPhysicalVertexList());
- allVertex.addAll(subPlan.getCoordinatorVertexList());
- allVertex.forEach(physicalVertex -> {
- Address deployAddress = physicalVertex.getCurrentExecutionAddress();
- ExecutionState executionState = physicalVertex.getExecutionState().get();
- if (null != deployAddress && deployAddress.equals(lostAddress) &&
- (executionState.equals(ExecutionState.DEPLOYING) ||
- executionState.equals(ExecutionState.RUNNING))) {
- TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
- physicalVertex.updateTaskExecutionState(
- new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
- new SeaTunnelEngineException(
- String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
- lostAddress))));
- }
- });
+ makeTasksFailed(subPlan.getCoordinatorVertexList(), lostAddress);
+ makeTasksFailed(subPlan.getPhysicalVertexList(), lostAddress);
});
});
}
+ private void makeTasksFailed(@NonNull List<PhysicalVertex> physicalVertexList, @NonNull Address lostAddress) {
+ physicalVertexList.forEach(physicalVertex -> {
+ Address deployAddress = physicalVertex.getCurrentExecutionAddress();
+ ExecutionState executionState = physicalVertex.getExecutionState();
+ if (null != deployAddress && deployAddress.equals(lostAddress) &&
+ (executionState.equals(ExecutionState.DEPLOYING) ||
+ executionState.equals(ExecutionState.RUNNING))) {
+ TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
+ physicalVertex.updateTaskExecutionState(
+ new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
+ new SeaTunnelEngineException(
+ String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
+ lostAddress))));
+ }
+ });
+ }
+
/**
* When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the TaskGroup's state.
*/
@@ -284,4 +396,33 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
+
+ private void printExecutionInfo() {
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
+ int activeCount = threadPoolExecutor.getActiveCount();
+ int corePoolSize = threadPoolExecutor.getCorePoolSize();
+ int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
+ int poolSize = threadPoolExecutor.getPoolSize();
+ long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
+ long taskCount = threadPoolExecutor.getTaskCount();
+ StringBuffer sbf = new StringBuffer();
+ sbf.append("activeCount=")
+ .append(activeCount)
+ .append("\n")
+ .append("corePoolSize=")
+ .append(corePoolSize)
+ .append("\n")
+ .append("maximumPoolSize=")
+ .append(maximumPoolSize)
+ .append("\n")
+ .append("poolSize=")
+ .append(poolSize)
+ .append("\n")
+ .append("completedTaskCount=")
+ .append(completedTaskCount)
+ .append("\n")
+ .append("taskCount=")
+ .append(taskCount);
+ logger.info(sbf.toString());
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index eb9d1c07f..d2d8b67d6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -83,7 +83,8 @@ public class TaskExecutionService {
private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
// key: TaskID
private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
- private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures =
+ new ConcurrentHashMap<>();
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -126,17 +127,18 @@ public class TaskExecutionService {
uncheckRun(startedLatch::await);
}
- public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(@NonNull Data taskImmutableInformation) {
+ public PassiveCompletableFuture<TaskExecutionState> deployTask(@NonNull Data taskImmutableInformation) {
TaskGroupImmutableInformation taskImmutableInfo =
nodeEngine.getSerializationService().toObject(taskImmutableInformation);
return deployTask(taskImmutableInfo);
}
public <T extends Task> T getTask(TaskLocation taskLocation) {
- return this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup().getTask(taskLocation.getTaskID());
+ return this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup()
+ .getTask(taskLocation.getTaskID());
}
- public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(
+ public PassiveCompletableFuture<TaskExecutionState> deployTask(
@NonNull TaskGroupImmutableInformation taskImmutableInfo) {
CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
TaskGroup taskGroup = null;
@@ -146,21 +148,30 @@ public class TaskExecutionService {
if (!CollectionUtils.isEmpty(jars)) {
classLoader = new SeatunnelChildFirstClassLoader(Lists.newArrayList(jars));
taskGroup =
- CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(), classLoader,
+ CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
+ classLoader,
taskImmutableInfo.getGroup());
} else {
taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
}
- if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
- throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
+ logger.info(String.format("deploying task %s", taskGroup.getTaskGroupLocation()));
+
+ synchronized (this) {
+ if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
+ throw new RuntimeException(
+ String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
+ }
+ return deployLocalTask(taskGroup, resultFuture, classLoader);
}
- return deployLocalTask(taskGroup, resultFuture, classLoader);
} catch (Throwable t) {
logger.severe(String.format("TaskGroupID : %s deploy error with Exception: %s",
- taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
+ taskGroup != null && taskGroup.getTaskGroupLocation() != null ?
+ taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
ExceptionUtils.getMessage(t)));
resultFuture.complete(
- new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation() : null, ExecutionState.FAILED, t));
+ new TaskExecutionState(
+ taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation() :
+ null, ExecutionState.FAILED, t));
}
return new PassiveCompletableFuture<>(resultFuture);
}
@@ -205,7 +216,8 @@ public class TaskExecutionService {
long sleepTime = 1000;
do {
if (null != invoke) {
- logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis", taskGroup.getTaskGroupLocation(), sleepTime));
+ logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
+ taskGroup.getTaskGroupLocation(), sleepTime));
try {
Thread.sleep(sleepTime += 1000);
} catch (InterruptedException e) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 296ad2e1f..b19e7d688 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -26,20 +26,24 @@ import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
import lombok.NonNull;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class PhysicalPlan {
private static final ILogger LOGGER = Logger.getLogger(PhysicalPlan.class);
+ /**
+ * The max num pipeline can restore.
+ */
+ public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set by config
private final List<SubPlan> pipelineList;
@@ -49,14 +53,9 @@ public class PhysicalPlan {
private AtomicInteger failedPipelineNum = new AtomicInteger(0);
- private AtomicReference<JobStatus> jobStatus = new AtomicReference<>();
-
private final JobImmutableInformation jobImmutableInformation;
- /**
- * If the job or pipeline cancel by user, needRestore will be false
- **/
- private volatile boolean needRestore = true;
+ private final IMap<Object, Object> runningJobStateIMap;
/**
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
@@ -64,7 +63,7 @@ public class PhysicalPlan {
* of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
* stateTimestamps[RUNNING.ordinal()]}.
*/
- private final long[] stateTimestamps;
+ private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
/**
* when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan
@@ -76,10 +75,17 @@ public class PhysicalPlan {
private final String jobFullName;
- private JobMaster jobMaster;
+ private final long jobId;
private final Map<Integer, CompletableFuture> pipelineSchedulerFutureMap;
+ private JobMaster jobMaster;
+
+ /**
+ * If the job or pipeline cancel by user, needRestore will be false
+ **/
+ private volatile boolean needRestore = true;
+
/**
* Whether we make the job end when pipeline turn to end state.
*/
@@ -88,13 +94,28 @@ public class PhysicalPlan {
public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
@NonNull ExecutorService executorService,
@NonNull JobImmutableInformation jobImmutableInformation,
- long initializationTimestamp) {
+ long initializationTimestamp,
+ @NonNull IMap runningJobStateIMap,
+ @NonNull IMap runningJobStateTimestampsIMap) {
this.executorService = executorService;
this.jobImmutableInformation = jobImmutableInformation;
- stateTimestamps = new long[JobStatus.values().length];
- this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
- this.jobStatus.set(JobStatus.CREATED);
- this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
+ this.jobId = jobImmutableInformation.getJobId();
+ Long[] stateTimestamps = new Long[JobStatus.values().length];
+ if (runningJobStateTimestampsIMap.get(jobId) == null) {
+ stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+ runningJobStateTimestampsIMap.put(jobId, stateTimestamps);
+ }
+
+ if (runningJobStateIMap.get(jobId) == null) {
+ // We must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap.
+ // Because if a new Master Node become active, we can recover ExecutionState and PipelineState and JobStatus
+ // from TaskExecutionService. But we can not recover stateTimestamps.
+ stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
+ runningJobStateTimestampsIMap.put(jobId, stateTimestamps);
+
+ runningJobStateIMap.put(jobId, JobStatus.CREATED);
+ }
+
this.jobEndFuture = new CompletableFuture<>();
this.pipelineList = pipelineList;
if (pipelineList.isEmpty()) {
@@ -103,24 +124,27 @@ public class PhysicalPlan {
this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId());
- pipelineSchedulerFutureMap = new HashMap<>(pipelineList.size());
+ pipelineSchedulerFutureMap = new ConcurrentHashMap<>(pipelineList.size());
+ this.runningJobStateIMap = runningJobStateIMap;
+ this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
public void setJobMaster(JobMaster jobMaster) {
this.jobMaster = jobMaster;
+ pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
}
public void initStateFuture() {
pipelineList.forEach(subPlan -> addPipelineEndCallback(subPlan));
}
- private void addPipelineEndCallback(SubPlan subPlan) {
+ public void addPipelineEndCallback(SubPlan subPlan) {
PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
future.thenAcceptAsync(pipelineState -> {
try {
if (PipelineState.CANCELED.equals(pipelineState)) {
- if (needRestore) {
- restorePipeline(subPlan);
+ if (canRestorePipeline(subPlan)) {
+ subPlan.restorePipeline();
return;
}
canceledPipelineNum.incrementAndGet();
@@ -131,17 +155,17 @@ public class PhysicalPlan {
cancelJob();
}
LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
- jobMaster.releasePipelineResource(subPlan.getPipelineId());
+ jobMaster.releasePipelineResource(subPlan);
} else if (PipelineState.FAILED.equals(pipelineState)) {
- if (needRestore) {
- restorePipeline(subPlan);
+ if (canRestorePipeline(subPlan)) {
+ subPlan.restorePipeline();
return;
}
failedPipelineNum.incrementAndGet();
if (makeJobEndWhenPipelineEnded) {
cancelJob();
}
- jobMaster.releasePipelineResource(subPlan.getPipelineId());
+ jobMaster.releasePipelineResource(subPlan);
LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
}
} catch (Throwable e) {
@@ -157,38 +181,50 @@ public class PhysicalPlan {
} else {
turnToEndState(JobStatus.FINISHED);
}
- jobEndFuture.complete(jobStatus.get());
+ jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
}
});
}
+ private boolean canRestorePipeline(SubPlan subPlan) {
+ return needRestore && subPlan.getPipelineRestoreNum() < PIPELINE_MAX_RESTORE_NUM;
+ }
+
public void cancelJob() {
- if (jobStatus.get().isEndState()) {
- LOGGER.warning(String.format("%s is in end state %s, can not be cancel", jobFullName, jobStatus.get()));
+ if (getJobStatus().isEndState()) {
+ LOGGER.warning(String.format("%s is in end state %s, can not be cancel", jobFullName, getJobStatus()));
+ return;
+ }
+
+ // If an active Master Node done and another Master Node active, we can not know whether cancelRunningJob
+ // complete. So we need cancelRunningJob again.
+ if (JobStatus.CANCELLING.equals(getJobStatus())) {
+ cancelJobPipelines();
return;
}
- updateJobState(jobStatus.get(), JobStatus.CANCELLING);
+ updateJobState((JobStatus) runningJobStateIMap.get(jobId), JobStatus.CANCELLING);
cancelJobPipelines();
}
private void cancelJobPipelines() {
List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
- if (!pipeline.getPipelineState().get().isEndState() &&
- !PipelineState.CANCELING.equals(pipeline.getPipelineState().get())) {
- CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
- pipeline.cancelPipeline();
- return null;
- }, executorService);
- return future;
+ if (PipelineState.CANCELING.equals(pipeline.getPipelineState()) ||
+ pipeline.getPipelineState().isEndState()) {
+ LOGGER.info(String.format("%s already in state %s", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState()));
+ return null;
}
- return null;
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ pipeline.cancelPipeline();
+ });
+ return future;
}).filter(x -> x != null).collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
- collect.toArray(new CompletableFuture[collect.size()]));
- voidCompletableFuture.get();
+ collect.toArray(new CompletableFuture[0]));
+ voidCompletableFuture.join();
} catch (Exception e) {
LOGGER.severe(
String.format("%s cancel error with exception: %s", jobFullName, ExceptionUtils.getMessage(e)));
@@ -199,75 +235,68 @@ public class PhysicalPlan {
return pipelineList;
}
- public boolean turnToRunning() {
- return updateJobState(JobStatus.CREATED, JobStatus.RUNNING);
- }
-
private void turnToEndState(@NonNull JobStatus endState) {
- // consistency check
- if (jobStatus.get().isEndState()) {
- String message = "Job is trying to leave terminal state " + jobStatus.get();
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ synchronized (this) {
+ // consistency check
+ JobStatus current = (JobStatus) runningJobStateIMap.get(jobId);
+ if (current.isEndState()) {
+ String message = "Job is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (!endState.isEndState()) {
- String message = "Need a end state, not " + endState;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (!endState.isEndState()) {
+ String message = "Need a end state, not " + endState;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- LOGGER.info(String.format("%s turn to end state %s", jobFullName, endState));
- jobStatus.set(endState);
- stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
- }
+ LOGGER.info(String.format("%s end with state %s", getJobFullName(), endState));
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ updateStateTimestamps(endState);
- public boolean updateJobState(@NonNull JobStatus targetState) {
- return updateJobState(jobStatus.get(), targetState);
+ runningJobStateIMap.put(jobId, endState);
+ }
}
- public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) {
- // consistency check
- if (current.isEndState()) {
- String message = "Job is trying to leave terminal state " + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ private void updateStateTimestamps(@NonNull JobStatus targetState) {
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ Long[] stateTimestamps = runningJobStateTimestampsIMap.get(jobId);
+ stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+ runningJobStateTimestampsIMap.set(jobId, stateTimestamps);
+ }
- // now do the actual state transition
- if (jobStatus.compareAndSet(current, targetState)) {
- LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
- jobImmutableInformation.getJobConfig().getName(),
- jobImmutableInformation.getJobId(),
- current,
- targetState));
-
- stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
- return true;
- } else {
- return false;
+ public boolean updateJobState(@NonNull JobStatus targetState) {
+ synchronized (this) {
+ return updateJobState((JobStatus) runningJobStateIMap.get(jobId), targetState);
}
}
- private void restorePipeline(SubPlan subPlan) {
- try {
- LOGGER.info(String.format("Restore pipeline %s", subPlan.getPipelineFullName()));
- // We must ensure the scheduler complete and then can handle pipeline state change.
- jobMaster.getScheduleFuture().join();
-
- if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
- pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+ public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) {
+ synchronized (this) {
+ // consistency check
+ if (current.isEndState()) {
+ String message = "Job is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
}
- subPlan.reset();
- addPipelineEndCallback(subPlan);
- pipelineSchedulerFutureMap.put(subPlan.getPipelineId(), jobMaster.reSchedulerPipeline(subPlan));
- if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
- pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+
+ // now do the actual state transition
+ if (current.equals(runningJobStateIMap.get(jobId))) {
+ LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobId,
+ current,
+ targetState));
+
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ updateStateTimestamps(targetState);
+
+ runningJobStateIMap.set(jobId, targetState);
+ return true;
+ } else {
+ return false;
}
- } catch (Throwable e) {
- LOGGER.severe(String.format("Restore pipeline %s error with exception: %s", subPlan.getPipelineFullName(),
- ExceptionUtils.getMessage(e)));
- subPlan.cancelPipeline();
}
}
@@ -280,7 +309,7 @@ public class PhysicalPlan {
}
public JobStatus getJobStatus() {
- return jobStatus.get();
+ return (JobStatus) runningJobStateIMap.get(jobId);
}
public String getJobFullName() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 25c4c8c7e..90bf6336f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -54,6 +54,7 @@ import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
import com.google.common.collect.Lists;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.jet.datamodel.Tuple2;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
@@ -110,12 +111,18 @@ public class PhysicalPlanGenerator {
*/
private final Set<TaskLocation> startingTasks;
+ private final IMap<Object, Object> runningJobStateIMap;
+
+ private final IMap<Object, Object> runningJobStateTimestampsIMap;
+
public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,
@NonNull NodeEngine nodeEngine,
@NonNull JobImmutableInformation jobImmutableInformation,
long initializationTimestamp,
@NonNull ExecutorService executorService,
- @NonNull FlakeIdGenerator flakeIdGenerator) {
+ @NonNull FlakeIdGenerator flakeIdGenerator,
+ @NonNull IMap runningJobStateIMap,
+ @NonNull IMap runningJobStateTimestampsIMap) {
this.pipelines = executionPlan.getPipelines();
this.nodeEngine = nodeEngine;
this.jobImmutableInformation = jobImmutableInformation;
@@ -125,6 +132,8 @@ public class PhysicalPlanGenerator {
// the checkpoint of a pipeline
this.pipelineTasks = new HashSet<>();
this.startingTasks = new HashSet<>();
+ this.runningJobStateIMap = runningJobStateIMap;
+ this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
@@ -170,13 +179,17 @@ public class PhysicalPlanGenerator {
physicalVertexList,
coordinatorVertexList,
jobImmutableInformation,
- executorService);
+ executorService,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap);
});
PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
executorService,
jobImmutableInformation,
- initializationTimestamp);
+ initializationTimestamp,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap);
return Tuple2.tuple2(physicalPlan, checkpointPlans);
}
@@ -227,7 +240,9 @@ public class PhysicalPlanGenerator {
s.getJarUrls(),
jobImmutableInformation,
initializationTimestamp,
- nodeEngine);
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap);
} else {
return null;
}
@@ -267,7 +282,9 @@ public class PhysicalPlanGenerator {
seaTunnelTask.getJarsUrl(),
jobImmutableInformation,
initializationTimestamp,
- nodeEngine));
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap));
}
return t.stream();
}).collect(Collectors.toList());
@@ -303,7 +320,9 @@ public class PhysicalPlanGenerator {
t.getJarsUrl(),
jobImmutableInformation,
initializationTimestamp,
- nodeEngine);
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap);
}).collect(Collectors.toList());
}
@@ -363,7 +382,9 @@ public class PhysicalPlanGenerator {
jars,
jobImmutableInformation,
initializationTimestamp,
- nodeEngine));
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap));
} else {
t.add(new PhysicalVertex(
i,
@@ -378,7 +399,9 @@ public class PhysicalPlanGenerator {
jars,
jobImmutableInformation,
initializationTimestamp,
- nodeEngine));
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap));
}
}
return t.stream();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 6838df1f6..b4386f14a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -37,6 +36,7 @@ import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
@@ -44,7 +44,6 @@ import java.net.URL;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
@@ -79,7 +78,7 @@ public class PhysicalVertex {
private final Set<URL> pluginJarsUrls;
- private AtomicReference<ExecutionState> executionState = new AtomicReference<>();
+ private final IMap<Object, Object> runningJobStateIMap;
/**
* When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
@@ -93,7 +92,7 @@ public class PhysicalVertex {
* of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
* stateTimestamps[RUNNING.ordinal()]}.
*/
- private final long[] stateTimestamps;
+ private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
private final JobImmutableInformation jobImmutableInformation;
@@ -103,6 +102,8 @@ public class PhysicalVertex {
private Address currentExecutionAddress;
+ private TaskGroupImmutableInformation taskGroupImmutableInformation;
+
public PhysicalVertex(int subTaskGroupIndex,
@NonNull ExecutorService executorService,
int parallelism,
@@ -113,7 +114,10 @@ public class PhysicalVertex {
Set<URL> pluginJarsUrls,
@NonNull JobImmutableInformation jobImmutableInformation,
long initializationTimestamp,
- @NonNull NodeEngine nodeEngine) {
+ @NonNull NodeEngine nodeEngine,
+ @NonNull IMap runningJobStateIMap,
+ @NonNull IMap runningJobStateTimestampsIMap) {
+ this.taskGroupLocation = taskGroup.getTaskGroupLocation();
this.subTaskGroupIndex = subTaskGroupIndex;
this.executorService = executorService;
this.parallelism = parallelism;
@@ -124,10 +128,22 @@ public class PhysicalVertex {
this.pluginJarsUrls = pluginJarsUrls;
this.jobImmutableInformation = jobImmutableInformation;
this.initializationTimestamp = initializationTimestamp;
- stateTimestamps = new long[ExecutionState.values().length];
- this.stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
- this.executionState.set(ExecutionState.CREATED);
- this.stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+
+ Long[] stateTimestamps = new Long[ExecutionState.values().length];
+ if (runningJobStateTimestampsIMap.get(taskGroup.getTaskGroupLocation()) == null) {
+ stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
+ runningJobStateTimestampsIMap.put(taskGroup.getTaskGroupLocation(), stateTimestamps);
+
+ }
+
+ if (runningJobStateIMap.get(taskGroupLocation) == null) {
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+ runningJobStateTimestampsIMap.put(taskGroupLocation, stateTimestamps);
+
+ runningJobStateIMap.put(taskGroupLocation, ExecutionState.CREATED);
+ }
+
this.nodeEngine = nodeEngine;
this.taskFullName =
String.format(
@@ -140,10 +156,17 @@ public class PhysicalVertex {
subTaskGroupIndex + 1,
parallelism);
this.taskFuture = new CompletableFuture<>();
- this.taskGroupLocation = taskGroup.getTaskGroupLocation();
+
+ this.runningJobStateIMap = runningJobStateIMap;
+ this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
+ ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+ // If the task state is CANCELING we need call noticeTaskExecutionServiceCancel().
+ if (ExecutionState.CANCELING.equals(executionState)) {
+ noticeTaskExecutionServiceCancel();
+ }
this.taskFuture = new CompletableFuture<>();
return new PassiveCompletableFuture<>(this.taskFuture);
}
@@ -187,31 +210,12 @@ public class PhysicalVertex {
private void deployInternal(Consumer<TaskGroupImmutableInformation> taskGroupConsumer) {
TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation();
- if (ExecutionState.DEPLOYING.equals(executionState.get())) {
- taskGroupConsumer.accept(taskGroupImmutableInformation);
- // may be canceling
- if (!updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
- // If we found the task state turned to CANCELING after deployed to TaskExecutionService. We need
- // notice the TaskExecutionService to cancel this task.
- noticeTaskExecutionServiceCancel();
- if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
- turnToEndState(ExecutionState.CANCELED);
- taskFuture.complete(
- new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
- } else {
- turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
- new JobException(String.format("%s turn to a unexpected state: %s, make it Failed",
- this.getTaskFullName(), executionState.get()))));
- }
+ synchronized (this) {
+ ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+ if (ExecutionState.DEPLOYING.equals(currentState)) {
+ taskGroupConsumer.accept(taskGroupImmutableInformation);
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
}
- } else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
- turnToEndState(ExecutionState.CANCELED);
- taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
- } else {
- turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(),
- new JobException(String.format("%s turn to a unexpected state", getTaskFullName()))));
}
}
@@ -231,63 +235,71 @@ public class PhysicalVertex {
}
private boolean turnToEndState(@NonNull ExecutionState endState) {
- // consistency check
- if (executionState.get().isEndState()) {
- String message = String.format("Task %s is already in terminal state %s", taskFullName, executionState.get());
- LOGGER.warning(message);
- return false;
- }
- if (!endState.isEndState()) {
- String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
- LOGGER.warning(message);
- return false;
- }
+ synchronized (this) {
+ // consistency check
+ ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+ if (currentState.isEndState()) {
+ String message = String.format("Task %s is already in terminal state %s", taskFullName, currentState);
+ LOGGER.warning(message);
+ return false;
+ }
+ if (!endState.isEndState()) {
+ String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
+ LOGGER.warning(message);
+ return false;
+ }
+
+ LOGGER.info(String.format("%s turn to end state %s.",
+ taskFullName,
+ endState));
+ updateStateTimestamps(endState);
- LOGGER.info(String.format("%s turn to end state %s.",
- taskFullName,
- endState));
- executionState.set(endState);
- stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
- return true;
+ runningJobStateIMap.set(taskGroupLocation, endState);
+ return true;
+ }
}
public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
- // consistency check
- if (current.isEndState()) {
- String message = "Task is trying to leave terminal state " + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ synchronized (this) {
+ // consistency check
+ if (current.isEndState()) {
+ String message = "Task is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (ExecutionState.SCHEDULED.equals(targetState) && !ExecutionState.CREATED.equals(current)) {
- String message = "Only [CREATED] task can turn to [SCHEDULED]" + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (ExecutionState.SCHEDULED.equals(targetState) && !ExecutionState.CREATED.equals(current)) {
+ String message = "Only [CREATED] task can turn to [SCHEDULED]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (ExecutionState.DEPLOYING.equals(targetState) && !ExecutionState.SCHEDULED.equals(current)) {
- String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (ExecutionState.DEPLOYING.equals(targetState) && !ExecutionState.SCHEDULED.equals(current)) {
+ String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (ExecutionState.RUNNING.equals(targetState) && !ExecutionState.DEPLOYING.equals(current)) {
- String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (ExecutionState.RUNNING.equals(targetState) && !ExecutionState.DEPLOYING.equals(current)) {
+ String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- // now do the actual state transition
- if (executionState.compareAndSet(current, targetState)) {
- LOGGER.info(String.format("%s turn from state %s to %s.",
- taskFullName,
- current,
- targetState));
+ // now do the actual state transition
+ if (current.equals(runningJobStateIMap.get(taskGroupLocation))) {
+ LOGGER.info(String.format("%s turn from state %s to %s.",
+ taskFullName,
+ current,
+ targetState));
- stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
- return true;
- } else {
- return false;
+ updateStateTimestamps(targetState);
+
+ runningJobStateIMap.set(taskGroupLocation, targetState);
+ return true;
+ } else {
+ return false;
+ }
}
}
@@ -297,10 +309,9 @@ public class PhysicalVertex {
public void cancel() {
if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
- updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)) {
+ updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED) ||
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELED)) {
taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
- } else if (updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELING)) {
- // do nothing, because even if task is deployed to TaskExecutionService, we can do the cancel in deploy method
} else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
noticeTaskExecutionServiceCancel();
}
@@ -314,7 +325,7 @@ public class PhysicalVertex {
try {
i++;
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new CancelTaskOperation(taskGroup.getTaskGroupLocation()),
+ new CancelTaskOperation(taskGroupLocation),
currentExecutionAddress)
.invoke().get();
return;
@@ -330,26 +341,38 @@ public class PhysicalVertex {
}
}
+ private void updateStateTimestamps(@NonNull ExecutionState targetState) {
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ Long[] stateTimestamps = runningJobStateTimestampsIMap.get(taskGroupLocation);
+ stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+ runningJobStateTimestampsIMap.set(taskGroupLocation, stateTimestamps);
+
+ }
+
+ public ExecutionState getExecutionState() {
+ return (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+ }
+
private void resetExecutionState() {
- if (!executionState.get().isEndState()) {
- String message =
- String.format("%s reset state failed, only end state can be reset, current is %s", getTaskFullName(),
- executionState.get());
- LOGGER.severe(message);
- throw new IllegalStateException(message);
+ synchronized (this) {
+ ExecutionState executionState = getExecutionState();
+ if (!executionState.isEndState()) {
+ String message =
+ String.format("%s reset state failed, only end state can be reset, current is %s",
+ getTaskFullName(),
+ executionState);
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+ updateStateTimestamps(ExecutionState.CREATED);
+ runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
}
- executionState.set(ExecutionState.CREATED);
- stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
}
public void reset() {
resetExecutionState();
}
- public AtomicReference<ExecutionState> getExecutionState() {
- return executionState;
- }
-
public String getTaskFullName() {
return taskFullName;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
new file mode 100644
index 000000000..45609e5ce
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@AllArgsConstructor
+@Data
+public class PipelineLocation implements Serializable {
+ private long jobId;
+ private int pipelineId;
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index bd66dc91e..6dc657976 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.jet.datamodel.Tuple2;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
@@ -33,17 +34,26 @@ import java.util.concurrent.ExecutorService;
public class PlanUtils {
public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> fromLogicalDAG(@NonNull LogicalDag logicalDag,
- @NonNull NodeEngine nodeEngine,
- @NonNull JobImmutableInformation jobImmutableInformation,
- long initializationTimestamp,
- @NonNull ExecutorService executorService,
- @NonNull FlakeIdGenerator flakeIdGenerator) {
+ @NonNull NodeEngine nodeEngine,
+ @NonNull
+ JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp,
+ @NonNull ExecutorService executorService,
+ @NonNull FlakeIdGenerator flakeIdGenerator,
+ @NonNull IMap runningJobStateIMap,
+ @NonNull IMap runningJobStateTimestampsIMap) {
return new PhysicalPlanGenerator(
- new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, initializationTimestamp).generate(),
- nodeEngine,
+ new ExecutionPlanGenerator(
+ logicalDag,
jobImmutableInformation,
- initializationTimestamp,
- executorService,
- flakeIdGenerator).generate();
+ initializationTimestamp)
+ .generate(),
+ nodeEngine,
+ jobImmutableInformation,
+ initializationTimestamp,
+ executorService,
+ flakeIdGenerator,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap).generate();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index ff7264636..b754f50ee 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -23,16 +23,17 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineState;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
import lombok.NonNull;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class SubPlan {
@@ -54,17 +55,17 @@ public class SubPlan {
private AtomicInteger failedTaskNum = new AtomicInteger(0);
- private AtomicReference<PipelineState> pipelineState = new AtomicReference<>();
-
private final String pipelineFullName;
+ private final IMap<Object, Object> runningJobStateIMap;
+
/**
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
* pipeline transitioned into a certain state. The index into this array is the ordinal
* of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
* stateTimestamps[RUNNING.ordinal()]}.
*/
- private final long[] stateTimestamps;
+ private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
/**
* Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
@@ -72,24 +73,48 @@ public class SubPlan {
*/
private CompletableFuture<PipelineState> pipelineFuture;
+ private final PipelineLocation pipelineLocation;
+
private final ExecutorService executorService;
+ private JobMaster jobMaster;
+
+ private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;
+
+ private Integer pipelineRestoreNum;
+
public SubPlan(int pipelineId,
int totalPipelineNum,
long initializationTimestamp,
@NonNull List<PhysicalVertex> physicalVertexList,
@NonNull List<PhysicalVertex> coordinatorVertexList,
@NonNull JobImmutableInformation jobImmutableInformation,
- @NonNull ExecutorService executorService) {
+ @NonNull ExecutorService executorService,
+ @NonNull IMap runningJobStateIMap,
+ @NonNull IMap runningJobStateTimestampsIMap) {
this.pipelineId = pipelineId;
+ this.pipelineLocation = new PipelineLocation(jobImmutableInformation.getJobId(), pipelineId);
this.pipelineFuture = new CompletableFuture<>();
this.totalPipelineNum = totalPipelineNum;
this.physicalVertexList = physicalVertexList;
this.coordinatorVertexList = coordinatorVertexList;
- stateTimestamps = new long[PipelineState.values().length];
- this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
- this.pipelineState.set(PipelineState.CREATED);
- this.stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+ pipelineRestoreNum = 0;
+
+ Long[] stateTimestamps = new Long[PipelineState.values().length];
+ if (runningJobStateTimestampsIMap.get(pipelineLocation) == null) {
+ stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
+ runningJobStateTimestampsIMap.put(pipelineLocation, stateTimestamps);
+
+ }
+
+ if (runningJobStateIMap.get(pipelineLocation) == null) {
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+ runningJobStateTimestampsIMap.put(pipelineLocation, stateTimestamps);
+
+ runningJobStateIMap.put(pipelineLocation, PipelineState.CREATED);
+ }
+
this.jobImmutableInformation = jobImmutableInformation;
this.pipelineFullName = String.format(
"Job %s (%s), Pipeline: [(%d/%d)]",
@@ -98,16 +123,18 @@ public class SubPlan {
pipelineId,
totalPipelineNum);
+ this.runningJobStateIMap = runningJobStateIMap;
+ this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.executorService = executorService;
}
public PassiveCompletableFuture<PipelineState> initStateFuture() {
- physicalVertexList.forEach(m -> {
- addPhysicalVertexCallBack(m.initStateFuture());
+ physicalVertexList.stream().forEach(physicalVertex -> {
+ addPhysicalVertexCallBack(physicalVertex.initStateFuture());
});
- coordinatorVertexList.forEach(m -> {
- addPhysicalVertexCallBack(m.initStateFuture());
+ coordinatorVertexList.stream().forEach(coordinator -> {
+ addPhysicalVertexCallBack(coordinator.initStateFuture());
});
this.pipelineFuture = new CompletableFuture<>();
@@ -124,12 +151,6 @@ public class SubPlan {
this.getPipelineFullName()));
failedTaskNum.incrementAndGet();
cancelPipeline();
- } else if (!ExecutionState.FINISHED.equals(executionState.getExecutionState())) {
- LOGGER.severe(String.format(
- "Task Failed in %s, with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.",
- this.getPipelineFullName()));
- failedTaskNum.incrementAndGet();
- cancelPipeline();
}
if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
@@ -143,88 +164,93 @@ public class SubPlan {
turnToEndState(PipelineState.FINISHED);
LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
}
- this.pipelineFuture.complete(pipelineState.get());
+ pipelineFuture.complete((PipelineState) runningJobStateIMap.get(pipelineLocation));
}
});
}
private void turnToEndState(@NonNull PipelineState endState) {
- // consistency check
- if (pipelineState.get().isEndState()) {
- String message = "Pipeline is trying to leave terminal state " + pipelineState.get();
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ synchronized (this) {
+ // consistency check
+ PipelineState current = (PipelineState) runningJobStateIMap.get(pipelineLocation);
+ if (current.isEndState()) {
+ String message = "Pipeline is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (!endState.isEndState()) {
- String message = "Need a end state, not " + endState;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (!endState.isEndState()) {
+ String message = "Need a end state, not " + endState;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- pipelineState.set(endState);
- stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
- }
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ updateStateTimestamps(endState);
- private void resetPipelineState() {
- if (!pipelineState.get().isEndState()) {
- String message = String.format("%s reset state failed, only end state can be reset, current is %s",
- getPipelineFullName(), pipelineState.get());
- LOGGER.severe(message);
- throw new IllegalStateException(message);
+ runningJobStateIMap.set(pipelineLocation, endState);
}
-
- pipelineState.set(PipelineState.CREATED);
- stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
}
public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
- // consistency check
- if (current.isEndState()) {
- String message = "Pipeline is trying to leave terminal state " + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ synchronized (this) {
+ // consistency check
+ if (current.isEndState()) {
+ String message = "Pipeline is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (PipelineState.SCHEDULED.equals(targetState) && !PipelineState.CREATED.equals(current)) {
- String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (PipelineState.SCHEDULED.equals(targetState) && !PipelineState.CREATED.equals(current)) {
+ String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (PipelineState.DEPLOYING.equals(targetState) && !PipelineState.SCHEDULED.equals(current)) {
- String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (PipelineState.DEPLOYING.equals(targetState) && !PipelineState.SCHEDULED.equals(current)) {
+ String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- if (PipelineState.RUNNING.equals(targetState) && !PipelineState.DEPLOYING.equals(current)) {
- String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
- }
+ if (PipelineState.RUNNING.equals(targetState) && !PipelineState.DEPLOYING.equals(current)) {
+ String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
- // now do the actual state transition
- if (pipelineState.compareAndSet(current, targetState)) {
- LOGGER.info(String.format("%s turn from state %s to %s.",
- pipelineFullName,
- current,
- targetState));
+ // now do the actual state transition
+ if (current.equals(runningJobStateIMap.get(pipelineLocation))) {
+ LOGGER.info(String.format("%s turn from state %s to %s.",
+ pipelineFullName,
+ current,
+ targetState));
- stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
- return true;
- } else {
- return false;
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ updateStateTimestamps(targetState);
+
+ runningJobStateIMap.set(pipelineLocation, targetState);
+ return true;
+ } else {
+ return false;
+ }
}
}
public void cancelPipeline() {
- if (pipelineState.get().isEndState()) {
- LOGGER.warning(String.format("%s is in end state %s, can not be cancel", pipelineFullName, pipelineState.get()));
+ if (getPipelineState().isEndState()) {
+ LOGGER.warning(
+ String.format("%s is in end state %s, can not be cancel", pipelineFullName, getPipelineState()));
return;
}
- updatePipelineState(pipelineState.get(), PipelineState.CANCELING);
- cancelPipelineTasks();
+ // If an active Master Node done and another Master Node active, we can not know whether canceled pipeline
+ // complete. So we need cancel running pipeline again.
+ if (PipelineState.CANCELING.equals((PipelineState) runningJobStateIMap.get(pipelineLocation))) {
+ LOGGER.info(String.format("%s already in state CANCELING, skip cancel", pipelineFullName));
+ } else {
+ updatePipelineState(getPipelineState(), PipelineState.CANCELING);
+ cancelPipelineTasks();
+ }
}
private void cancelPipelineTasks() {
@@ -248,8 +274,8 @@ public class SubPlan {
}
private CompletableFuture<Void> cancelTask(@NonNull PhysicalVertex task) {
- if (!task.getExecutionState().get().isEndState() &&
- !ExecutionState.CANCELING.equals(task.getExecutionState().get())) {
+ if (!task.getExecutionState().isEndState() &&
+ !ExecutionState.CANCELING.equals(task.getExecutionState())) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
task.cancel();
return null;
@@ -262,7 +288,7 @@ public class SubPlan {
/**
* Before restore a pipeline, the pipeline must do reset
*/
- public void reset() {
+ private void reset() {
resetPipelineState();
finishedTaskNum.set(0);
canceledTaskNum.set(0);
@@ -277,8 +303,66 @@ public class SubPlan {
});
}
- public int getPipelineId() {
- return pipelineId;
+ private void updateStateTimestamps(@NonNull PipelineState targetState) {
+ // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+ Long[] stateTimestamps = runningJobStateTimestampsIMap.get(pipelineLocation);
+ stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+ runningJobStateTimestampsIMap.set(pipelineLocation, stateTimestamps);
+
+ }
+
+ private void resetPipelineState() {
+ PipelineState pipelineState = getPipelineState();
+ if (!pipelineState.isEndState()) {
+ String message = String.format("%s reset state failed, only end state can be reset, current is %s",
+ getPipelineFullName(), pipelineState);
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ updateStateTimestamps(PipelineState.CREATED);
+ runningJobStateIMap.set(pipelineLocation, PipelineState.CREATED);
+ }
+
+ /**
+ * restore the pipeline when pipeline failed or canceled by error.
+ */
+ public void restorePipeline() {
+ synchronized (pipelineRestoreNum) {
+ try {
+ pipelineRestoreNum++;
+ LOGGER.info(String.format("Restore pipeline %s", pipelineFullName));
+ // We must ensure the scheduler complete and then can handle pipeline state change.
+ jobMaster.getScheduleFuture().join();
+
+ if (reSchedulerPipelineFuture != null) {
+ reSchedulerPipelineFuture.join();
+ }
+ reset();
+ jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+ reSchedulerPipelineFuture = jobMaster.reSchedulerPipeline(this);
+ if (reSchedulerPipelineFuture != null) {
+ reSchedulerPipelineFuture.join();
+ }
+ } catch (Throwable e) {
+ LOGGER.severe(
+ String.format("Restore pipeline %s error with exception: %s", pipelineFullName,
+ ExceptionUtils.getMessage(e)));
+ cancelPipeline();
+ }
+ }
+ }
+
+ /**
+ * restore the pipeline state after new Master Node active
+ */
+ public void restorePipelineState() {
+ // only need restore from RUNNING or CANCELING state
+ if (getPipelineState().ordinal() < PipelineState.RUNNING.ordinal()) {
+ restorePipeline();
+ } else if (PipelineState.CANCELING.equals(getPipelineState())) {
+ cancelPipelineTasks();
+ }
}
public List<PhysicalVertex> getPhysicalVertexList() {
@@ -293,7 +377,19 @@ public class SubPlan {
return pipelineFullName;
}
- public AtomicReference<PipelineState> getPipelineState() {
- return pipelineState;
+ public PipelineState getPipelineState() {
+ return (PipelineState) runningJobStateIMap.get(pipelineLocation);
+ }
+
+ public PipelineLocation getPipelineLocation() {
+ return pipelineLocation;
+ }
+
+ public void setJobMaster(JobMaster jobMaster) {
+ this.jobMaster = jobMaster;
+ }
+
+ public int getPipelineRestoreNum() {
+ return pipelineRestoreNum;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c55bc07fc..5e76f2b5a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -29,10 +29,11 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfiguration;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -46,6 +47,7 @@ import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
@@ -53,7 +55,6 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
public class JobMaster implements Runnable {
@@ -77,24 +78,37 @@ public class JobMaster implements Runnable {
private JobImmutableInformation jobImmutableInformation;
private JobScheduler jobScheduler;
- private final Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles;
+
+ /**
+ * we need store slot used by task in Hazelcast IMap and release or reuse it when a new master node active.
+ */
+ private final IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
+ private final IMap<Object, Object> runningJobStateIMap;
+
+ private final IMap<Object, Object> runningJobStateTimestampsIMap;
private CompletableFuture<Void> scheduleFuture = new CompletableFuture<>();
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
- @NonNull ResourceManager resourceManager) {
+ @NonNull ResourceManager resourceManager,
+ @NonNull IMap runningJobStateIMap,
+ @NonNull IMap runningJobStateTimestampsIMap,
+ @NonNull IMap ownedSlotProfilesIMap) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
flakeIdGenerator =
this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
- this.ownedSlotProfiles = new ConcurrentHashMap<>();
+ this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;
this.resourceManager = resourceManager;
+ this.runningJobStateIMap = runningJobStateIMap;
+ this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
- public void init() throws Exception {
+ public void init(long initializationTimestamp) throws Exception {
jobImmutableInformation = nodeEngine.getSerializationService().toObject(
jobImmutableInformationData);
LOGGER.info("Job [" + jobImmutableInformation.getJobId() + "] submit");
@@ -113,9 +127,11 @@ public class JobMaster implements Runnable {
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
jobImmutableInformation,
- System.currentTimeMillis(),
+ initializationTimestamp,
executorService,
- flakeIdGenerator);
+ flakeIdGenerator,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap);
this.physicalPlan = planTuple.f0();
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
@@ -162,19 +178,20 @@ public class JobMaster implements Runnable {
public void handleCheckpointTimeout(long pipelineId) {
this.physicalPlan.getPipelineList().forEach(pipeline -> {
- if (pipeline.getPipelineId() == pipelineId) {
+ if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
pipeline.cancelPipeline();
}
});
}
- public CompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
- return jobScheduler.reSchedulerPipeline(subPlan);
+ public PassiveCompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
+ return new PassiveCompletableFuture<>(jobScheduler.reSchedulerPipeline(subPlan));
}
- public void releasePipelineResource(int pipelineId) {
+ public void releasePipelineResource(SubPlan subPlan) {
resourceManager.releaseResources(jobImmutableInformation.getJobId(),
- Lists.newArrayList(ownedSlotProfiles.get(pipelineId).values()));
+ Lists.newArrayList(ownedSlotProfilesIMap.get(subPlan.getPipelineLocation()).values())).join();
+ ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation());
}
public void cleanJob() {
@@ -182,12 +199,12 @@ public class JobMaster implements Runnable {
}
public Address queryTaskGroupAddress(long taskGroupId) {
- for (Integer pipelineId : ownedSlotProfiles.keySet()) {
- Optional<PhysicalVertex> currentVertex = ownedSlotProfiles.get(pipelineId).keySet().stream()
- .filter(task -> task.getTaskGroupLocation().getTaskGroupId() == taskGroupId)
+ for (PipelineLocation pipelineLocation : ownedSlotProfilesIMap.keySet()) {
+ Optional<TaskGroupLocation> currentVertex = ownedSlotProfilesIMap.get(pipelineLocation).keySet().stream()
+ .filter(taskGroupLocation -> taskGroupLocation.getTaskGroupId() == taskGroupId)
.findFirst();
if (currentVertex.isPresent()) {
- return ownedSlotProfiles.get(pipelineId).get(currentVertex.get()).getWorker();
+ return ownedSlotProfilesIMap.get(pipelineLocation).get(currentVertex.get()).getWorker();
}
}
throw new IllegalArgumentException("can't find task group address from task group id: " + taskGroupId);
@@ -224,7 +241,7 @@ public class JobMaster implements Runnable {
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
this.physicalPlan.getPipelineList().forEach(pipeline -> {
- if (pipeline.getPipelineId() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+ if (pipeline.getPipelineLocation().getPipelineId() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
return;
}
@@ -246,16 +263,20 @@ public class JobMaster implements Runnable {
});
}
- public Map<Integer, Map<PhysicalVertex, SlotProfile>> getOwnedSlotProfiles() {
- return ownedSlotProfiles;
+ public Map<TaskGroupLocation, SlotProfile> getOwnedSlotProfiles(PipelineLocation pipelineLocation) {
+ return ownedSlotProfilesIMap.get(pipelineLocation);
}
- public void setOwnedSlotProfiles(@NonNull Integer pipelineId,
- @NonNull Map<PhysicalVertex, SlotProfile> pipelineOwnedSlotProfiles) {
- ownedSlotProfiles.put(pipelineId, pipelineOwnedSlotProfiles);
+ public void setOwnedSlotProfiles(@NonNull PipelineLocation pipelineLocation,
+ @NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
+ ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
}
public CompletableFuture<Void> getScheduleFuture() {
return scheduleFuture;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index 9d6d4cd14..f47a3da0a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
@@ -27,6 +28,8 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
public class GetJobStatusOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState {
private long jobId;
@@ -65,7 +68,15 @@ public class GetJobStatusOperation extends Operation implements IdentifiedDataSe
@Override
public void run() {
SeaTunnelServer service = getService();
- response = service.getJobStatus(jobId).ordinal();
+ CompletableFuture<JobStatus> future = CompletableFuture.supplyAsync(() -> {
+ return service.getJobStatus(jobId);
+ });
+
+ try {
+ response = future.get().ordinal();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index a3d27cd75..52dd0410e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
@@ -39,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class PipelineBaseScheduler implements JobScheduler {
@@ -58,7 +60,7 @@ public class PipelineBaseScheduler implements JobScheduler {
@Override
public void startScheduling() {
- if (physicalPlan.turnToRunning()) {
+ if (physicalPlan.updateJobState(JobStatus.CREATED, JobStatus.SCHEDULED)) {
List<CompletableFuture<Void>> collect =
physicalPlan.getPipelineList()
.stream()
@@ -68,6 +70,7 @@ public class PipelineBaseScheduler implements JobScheduler {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
collect.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
+ physicalPlan.updateJobState(JobStatus.SCHEDULED, JobStatus.RUNNING);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -85,50 +88,63 @@ public class PipelineBaseScheduler implements JobScheduler {
return null;
}
- Map<PhysicalVertex, SlotProfile> slotProfiles =
- getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles().get(pipeline.getPipelineId()));
+ Map<TaskGroupLocation, SlotProfile> slotProfiles =
+ getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
// To ensure release pipeline resource after new master node active, we need store slotProfiles first and then deploy tasks.
- jobMaster.setOwnedSlotProfiles(pipeline.getPipelineId(), slotProfiles);
+ jobMaster.setOwnedSlotProfiles(pipeline.getPipelineLocation(), slotProfiles);
// deploy pipeline
return CompletableFuture.runAsync(() -> {
deployPipeline(pipeline, slotProfiles);
- });
+ }, jobMaster.getExecutorService());
} catch (Exception e) {
pipeline.cancelPipeline();
return null;
}
}
- private Map<PhysicalVertex, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan pipeline,
- Map<PhysicalVertex, SlotProfile> ownedSlotProfiles) {
+ private Map<TaskGroupLocation, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan pipeline,
+ Map<TaskGroupLocation, SlotProfile> ownedSlotProfiles) {
if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty()) {
return applyResourceForPipeline(pipeline);
}
// TODO ensure the slots still exist and is owned by this pipeline
- for (Map.Entry<PhysicalVertex, SlotProfile> entry : ownedSlotProfiles.entrySet()) {
- if (entry.getValue() == null) {
- ownedSlotProfiles.put(entry.getKey(), applyResourceForTask(entry.getKey()).join());
- } else {
- entry.getKey().updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- }
+ Map<TaskGroupLocation, SlotProfile> currentOwnedSlotProfiles = new ConcurrentHashMap<>();
+ pipeline.getCoordinatorVertexList().forEach(
+ coordinator -> currentOwnedSlotProfiles.put(coordinator.getTaskGroupLocation(),
+ getOrApplyResourceForTask(coordinator, ownedSlotProfiles)));
+
+ pipeline.getPhysicalVertexList().forEach(
+ task -> currentOwnedSlotProfiles.put(task.getTaskGroupLocation(),
+ getOrApplyResourceForTask(task, ownedSlotProfiles)));
+
+ return currentOwnedSlotProfiles;
+ }
+
+ private SlotProfile getOrApplyResourceForTask(@NonNull PhysicalVertex task,
+ Map<TaskGroupLocation, SlotProfile> ownedSlotProfiles) {
+
+ if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty() ||
+ ownedSlotProfiles.get(task.getTaskGroupLocation()) == null) {
+ return applyResourceForTask(task).join();
}
- return ownedSlotProfiles;
+ task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+ return ownedSlotProfiles.get(task.getTaskGroupLocation());
}
- private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
- Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
- Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+ private Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+ Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();
// TODO If there is no enough resources for tasks, we need add some wait profile
subPlan.getCoordinatorVertexList()
.forEach(
- coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
+ coordinator -> futures.put(coordinator.getTaskGroupLocation(), applyResourceForTask(coordinator)));
subPlan.getPhysicalVertexList()
- .forEach(task -> futures.put(task, applyResourceForTask(task)));
+ .forEach(task -> futures.put(task.getTaskGroupLocation(), applyResourceForTask(task)));
- for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+ for (Map.Entry<TaskGroupLocation, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
slotProfiles.put(future.getKey(),
future.getValue() == null ? null : future.getValue().join());
}
@@ -140,20 +156,20 @@ public class PipelineBaseScheduler implements JobScheduler {
if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
// TODO custom resource size
return resourceManager.applyResource(jobId, new ResourceProfile());
- } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
- ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+ } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
+ ExecutionState.CANCELED.equals(task.getExecutionState())) {
LOGGER.info(
String.format("%s be canceled, skip %s this task.", task.getTaskFullName(),
ExecutionState.SCHEDULED));
return null;
} else {
- makeTaskFailed(task,
+ makeTaskFailed(task.getTaskGroupLocation(),
new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
- task.getTaskFullName(), task.getExecutionState().get())));
+ task.getTaskFullName(), task.getExecutionState())));
return null;
}
} catch (Throwable e) {
- makeTaskFailed(task, e);
+ makeTaskFailed(task.getTaskGroupLocation(), e);
return null;
}
}
@@ -164,8 +180,8 @@ public class PipelineBaseScheduler implements JobScheduler {
return CompletableFuture.runAsync(() -> {
task.deploy(slotProfile);
});
- } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
- ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+ } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
+ ExecutionState.CANCELED.equals(task.getExecutionState())) {
LOGGER.info(
String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), ExecutionState.DEPLOYING));
return null;
@@ -175,22 +191,24 @@ public class PipelineBaseScheduler implements JobScheduler {
task.getTaskGroupLocation(),
ExecutionState.FAILED,
new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
- task.getTaskFullName(), task.getExecutionState().get()))));
+ task.getTaskFullName(), task.getExecutionState()))));
return null;
}
}
- private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
+ private void deployPipeline(@NonNull SubPlan pipeline, Map<TaskGroupLocation, SlotProfile> slotProfiles) {
if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
try {
List<CompletableFuture<?>> deployCoordinatorFuture =
pipeline.getCoordinatorVertexList().stream()
- .map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
+ .map(coordinator -> deployTask(coordinator,
+ slotProfiles.get(coordinator.getTaskGroupLocation())))
.filter(Objects::nonNull).collect(Collectors.toList());
List<CompletableFuture<?>> deployTaskFuture =
- pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, slotProfiles.get(task)))
+ pipeline.getPhysicalVertexList().stream()
+ .map(task -> deployTask(task, slotProfiles.get(task.getTaskGroupLocation())))
.filter(Objects::nonNull).collect(Collectors.toList());
deployCoordinatorFuture.addAll(deployTaskFuture);
@@ -200,20 +218,20 @@ public class PipelineBaseScheduler implements JobScheduler {
if (!pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING)) {
LOGGER.info(
String.format("%s turn to state %s, skip the running state.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState().get()));
+ pipeline.getPipelineState()));
}
} catch (Exception e) {
makePipelineFailed(pipeline, e);
}
- } else if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
- PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+ } else if (PipelineState.CANCELING.equals(pipeline.getPipelineState()) ||
+ PipelineState.CANCELED.equals(pipeline.getPipelineState())) {
// may be canceled
LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState().get(), PipelineState.DEPLOYING));
+ pipeline.getPipelineState(), PipelineState.DEPLOYING));
} else {
makePipelineFailed(pipeline, new JobException(
String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
- pipeline.getPipelineState().get())));
+ pipeline.getPipelineState())));
}
}
@@ -223,32 +241,32 @@ public class PipelineBaseScheduler implements JobScheduler {
}
private void handlePipelineStateTurnError(SubPlan pipeline, PipelineState targetState) {
- if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
- PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+ if (PipelineState.CANCELING.equals(pipeline.getPipelineState()) ||
+ PipelineState.CANCELED.equals(pipeline.getPipelineState())) {
// may be canceled
LOGGER.info(
String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState().get(), targetState));
+ pipeline.getPipelineState(), targetState));
} else {
throw new JobException(
String.format("%s turn to a unexpected state: %s, stop scheduler job",
pipeline.getPipelineFullName(),
- pipeline.getPipelineState().get()));
+ pipeline.getPipelineState()));
}
}
private void makePipelineFailed(@NonNull SubPlan pipeline, Throwable e) {
pipeline.getCoordinatorVertexList().forEach(coordinator -> {
- makeTaskFailed(coordinator, e);
+ makeTaskFailed(coordinator.getTaskGroupLocation(), e);
});
pipeline.getPhysicalVertexList().forEach(task -> {
- makeTaskFailed(task, e);
+ makeTaskFailed(task.getTaskGroupLocation(), e);
});
}
- private void makeTaskFailed(@NonNull PhysicalVertex task, Throwable e) {
+ private void makeTaskFailed(@NonNull TaskGroupLocation taskGroupLocation, Throwable e) {
jobMaster.updateTaskExecutionState(
- new TaskExecutionState(task.getTaskGroupLocation(), ExecutionState.FAILED, e));
+ new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, e));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 46e05e93c..1daa5fa87 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -41,8 +41,9 @@ public abstract class AbstractSeaTunnelServerTest {
@Before
public void before() {
Config config = new Config();
- config.setInstanceName(this.getClass().getSimpleName());
- config.setClusterName(this.getClass().getSimpleName());
+ long time = System.currentTimeMillis();
+ config.setInstanceName(this.getClass().getSimpleName() + "_" + time);
+ config.setClusterName(this.getClass().getSimpleName() + "_" + time);
instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
nodeEngine = instance.node.nodeEngine;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 1913f0c84..a289a8854 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import com.hazelcast.map.IMap;
import org.junit.Assert;
import org.junit.Test;
@@ -55,11 +56,18 @@ public class CheckpointPlanTest extends AbstractSeaTunnelServerTest {
JobImmutableInformation jobInfo = new JobImmutableInformation(1,
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+
+ IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
+ IMap<Object, Long[]> runningJobStateTimestamp =
+ nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+
Map<Integer, CheckpointPlan> checkpointPlans = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
jobInfo,
System.currentTimeMillis(),
Executors.newCachedThreadPool(),
- instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f1();
+ instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+ runningJobState,
+ runningJobStateTimestamp).f1();
Assert.assertNotNull(checkpointPlans);
Assert.assertEquals(2, checkpointPlans.size());
// enum(1) + reader(2) + writer(2)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index d9bb880da..e589df31e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import com.google.common.collect.Sets;
+import com.hazelcast.map.IMap;
import org.junit.Assert;
import org.junit.Test;
@@ -97,11 +98,17 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+ IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
+ IMap<Object, Long[]> runningJobStateTimestamp =
+ nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+
PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
jobImmutableInformation,
System.currentTimeMillis(),
Executors.newCachedThreadPool(),
- instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
+ instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+ runningJobState,
+ runningJobStateTimestamp).f0();
Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 5c98b4bc2..060f67c70 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -27,15 +27,21 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.map.IMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -44,6 +50,46 @@ import java.util.concurrent.TimeUnit;
public class JobMasterTest extends AbstractSeaTunnelServerTest {
private Long jobId;
+ /**
+ * IMap key is jobId and value is a Tuple2
+ * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
+ * <p>
+ * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
+ */
+ private IMap<Long, RunningJobInfo> runningJobInfoIMap;
+
+ /**
+ * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+ * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+ * <p>
+ * The value of IMap is one of {@link JobStatus} {@link org.apache.seatunnel.engine.core.job.PipelineState}
+ * {@link org.apache.seatunnel.engine.server.execution.ExecutionState}
+ * <p>
+ * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active
+ */
+ IMap<Object, Object> runningJobStateIMap;
+
+ /**
+ * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+ * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+ * <p>
+ * The value of IMap is one of {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps
+ * {@link org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps
+ * {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
+ * <p>
+ * This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master node active
+ */
+ IMap<Object, Long[]> runningJobStateTimestampsIMap;
+
+ /**
+ * IMap key is {@link PipelineLocation}
+ * <p>
+ * The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
+ * <p>
+ * This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node active
+ */
+ private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
@Before
public void before() {
super.before();
@@ -90,5 +136,47 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assert.assertTrue(
jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get())));
+
+ testIMapRemovedAfterJobComplete(jobMaster);
+ }
+
+ private void testIMapRemovedAfterJobComplete(JobMaster jobMaster) {
+ runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
+ runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+ runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
+ ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Assert.assertNull(runningJobInfoIMap.get(jobId));
+ Assert.assertNull(runningJobStateIMap.get(jobId));
+ Assert.assertNull(runningJobStateTimestampsIMap.get(jobId));
+ Assert.assertNull(ownedSlotProfilesIMap.get(jobId));
+
+ jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+ Assert.assertNull(
+ runningJobStateIMap.get(pipeline.getPipelineLocation()));
+
+ Assert.assertNull(
+ runningJobStateTimestampsIMap.get(pipeline.getPipelineLocation()));
+ });
+ jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+ pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+ Assert.assertNull(
+ runningJobStateIMap.get(coordinator.getTaskGroupLocation()));
+
+ Assert.assertNull(
+ runningJobStateTimestampsIMap.get(coordinator.getTaskGroupLocation()));
+ });
+
+ pipeline.getPhysicalVertexList().forEach(task -> {
+ Assert.assertNull(
+ runningJobStateIMap.get(task.getTaskGroupLocation()));
+
+ Assert.assertNull(
+ runningJobStateTimestampsIMap.get(task.getTaskGroupLocation()));
+ });
+ });
+ });
}
}