You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/08 12:37:42 UTC
[incubator-seatunnel] branch st-engine updated: [Feature][ST-Engine] Add handle checkpoint timeout (#2667)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 0ffc40fe8 [Feature][ST-Engine] Add handle checkpoint timeout (#2667)
0ffc40fe8 is described below
commit 0ffc40fe8456cc5e5646e708f47567a06b2539b0
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Sep 8 20:37:37 2022 +0800
[Feature][ST-Engine] Add handle checkpoint timeout (#2667)
* Add Checkpoint Timeout Handle
* trigger ci
* fix ci error
* fix ci error
* fix review problem
* Update seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
Co-authored-by: Zongwen Li <zo...@gmail.com>
* fix resource relase error
* fix resource relase error
* fix resource relase error
Co-authored-by: Zongwen Li <zo...@gmail.com>
---
.../engine/client/JobConfigParserTest.java | 1 -
.../engine/server/dag/physical/PhysicalPlan.java | 132 +++++++++----
.../engine/server/dag/physical/PhysicalVertex.java | 39 +++-
.../engine/server/dag/physical/SubPlan.java | 69 ++++---
.../seatunnel/engine/server/master/JobMaster.java | 58 +++++-
.../engine/server/scheduler/JobScheduler.java | 11 +-
.../server/scheduler/PipelineBaseScheduler.java | 206 +++++++++++++--------
.../operation/GetTaskGroupAddressOperation.java | 2 +-
.../apache/seatunnel/engine/server/TestUtils.java | 64 +++++++
.../seatunnel/engine/server/dag/TaskTest.java | 28 +--
.../engine/server/master/JobMasterTest.java | 93 ++++++++++
11 files changed, 513 insertions(+), 190 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 39231a6df..a04932eeb 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -46,7 +46,6 @@ public class JobConfigParserTest {
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
-
Assert.assertEquals("LocalFile", actions.get(0).getName());
Assert.assertEquals(1, actions.get(0).getUpstream().size());
Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index e55b0da65..296ad2e1f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -22,12 +22,15 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,6 +53,11 @@ public class PhysicalPlan {
private final JobImmutableInformation jobImmutableInformation;
+ /**
+ * If the job or pipeline cancel by user, needRestore will be false
+ **/
+ private volatile boolean needRestore = true;
+
/**
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
* execution graph transitioned into a certain state. The index into this array is the ordinal
@@ -68,6 +76,15 @@ public class PhysicalPlan {
private final String jobFullName;
+ private JobMaster jobMaster;
+
+ private final Map<Integer, CompletableFuture> pipelineSchedulerFutureMap;
+
+ /**
+ * Whether we make the job end when pipeline turn to end state.
+ */
+ private boolean makeJobEndWhenPipelineEnded = true;
+
public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
@NonNull ExecutorService executorService,
@NonNull JobImmutableInformation jobImmutableInformation,
@@ -85,53 +102,77 @@ public class PhysicalPlan {
}
this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId());
+
+ pipelineSchedulerFutureMap = new HashMap<>(pipelineList.size());
+ }
+
+ public void setJobMaster(JobMaster jobMaster) {
+ this.jobMaster = jobMaster;
}
public void initStateFuture() {
- pipelineList.forEach(subPlan -> {
- PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
- future.whenComplete((v, t) -> {
- // We need not handle t, Because we will not return t from Pipeline
- if (PipelineState.CANCELED.equals(v)) {
+ pipelineList.forEach(subPlan -> addPipelineEndCallback(subPlan));
+ }
+
+ private void addPipelineEndCallback(SubPlan subPlan) {
+ PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
+ future.thenAcceptAsync(pipelineState -> {
+ try {
+ if (PipelineState.CANCELED.equals(pipelineState)) {
+ if (needRestore) {
+ restorePipeline(subPlan);
+ return;
+ }
canceledPipelineNum.incrementAndGet();
- } else if (PipelineState.FAILED.equals(v)) {
- LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
- failedPipelineNum.incrementAndGet();
- cancelJob();
- } else if (!PipelineState.FINISHED.equals(v)) {
- LOGGER.severe(
- "Pipeline Failed with Unknown PipelineState, Begin to cancel other pipelines in this job.");
+ if (makeJobEndWhenPipelineEnded) {
+ LOGGER.info(
+ String.format("cancel job %s because makeJobEndWhenPipelineEnded is %s", jobFullName,
+ makeJobEndWhenPipelineEnded));
+ cancelJob();
+ }
+ LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
+ jobMaster.releasePipelineResource(subPlan.getPipelineId());
+ } else if (PipelineState.FAILED.equals(pipelineState)) {
+ if (needRestore) {
+ restorePipeline(subPlan);
+ return;
+ }
failedPipelineNum.incrementAndGet();
- cancelJob();
+ if (makeJobEndWhenPipelineEnded) {
+ cancelJob();
+ }
+ jobMaster.releasePipelineResource(subPlan.getPipelineId());
+ LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
}
+ } catch (Throwable e) {
+ // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
+ LOGGER.severe(ExceptionUtils.getMessage(e));
+ }
- if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
- if (failedPipelineNum.get() > 0) {
- updateJobState(JobStatus.FAILING);
- } else if (canceledPipelineNum.get() > 0) {
- turnToEndState(JobStatus.CANCELED);
- } else {
- turnToEndState(JobStatus.FINISHED);
- }
- jobEndFuture.complete(jobStatus.get());
+ if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
+ if (failedPipelineNum.get() > 0) {
+ updateJobState(JobStatus.FAILING);
+ } else if (canceledPipelineNum.get() > 0) {
+ turnToEndState(JobStatus.CANCELED);
+ } else {
+ turnToEndState(JobStatus.FINISHED);
}
- });
+ jobEndFuture.complete(jobStatus.get());
+ }
});
}
public void cancelJob() {
- if (!updateJobState(JobStatus.CREATED, JobStatus.CANCELED)) {
- // may be running, failing, failed, canceling , canceled, finished
- if (updateJobState(JobStatus.RUNNING, JobStatus.CANCELLING)) {
- cancelRunningJob();
- } else {
- LOGGER.info(
- String.format("%s in a non cancellable state: %s, skip cancel", jobFullName, jobStatus.get()));
- }
+ if (jobStatus.get().isEndState()) {
+ LOGGER.warning(String.format("%s is in end state %s, can not be cancel", jobFullName, jobStatus.get()));
+ return;
}
+
+ updateJobState(jobStatus.get(), JobStatus.CANCELLING);
+ cancelJobPipelines();
}
- private void cancelRunningJob() {
+ private void cancelJobPipelines() {
List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
if (!pipeline.getPipelineState().get().isEndState() &&
!PipelineState.CANCELING.equals(pipeline.getPipelineState().get())) {
@@ -176,6 +217,7 @@ public class PhysicalPlan {
throw new IllegalStateException(message);
}
+ LOGGER.info(String.format("%s turn to end state %s", jobFullName, endState));
jobStatus.set(endState);
stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
}
@@ -207,6 +249,28 @@ public class PhysicalPlan {
}
}
+ private void restorePipeline(SubPlan subPlan) {
+ try {
+ LOGGER.info(String.format("Restore pipeline %s", subPlan.getPipelineFullName()));
+ // We must ensure the scheduler complete and then can handle pipeline state change.
+ jobMaster.getScheduleFuture().join();
+
+ if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
+ pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+ }
+ subPlan.reset();
+ addPipelineEndCallback(subPlan);
+ pipelineSchedulerFutureMap.put(subPlan.getPipelineId(), jobMaster.reSchedulerPipeline(subPlan));
+ if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
+ pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+ }
+ } catch (Throwable e) {
+ LOGGER.severe(String.format("Restore pipeline %s error with exception: %s", subPlan.getPipelineFullName(),
+ ExceptionUtils.getMessage(e)));
+ subPlan.cancelPipeline();
+ }
+ }
+
public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
return new PassiveCompletableFuture<>(jobEndFuture);
}
@@ -222,4 +286,8 @@ public class PhysicalPlan {
public String getJobFullName() {
return jobFullName;
}
+
+ public void neverNeedRestore() {
+ this.needRestore = false;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 399c4850d..b90e21b8a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -85,7 +85,7 @@ public class PhysicalVertex {
* When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
* in {@link SubPlan} whenComplete method will be called.
*/
- private final CompletableFuture<TaskExecutionState> taskFuture;
+ private CompletableFuture<TaskExecutionState> taskFuture;
/**
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
@@ -144,6 +144,7 @@ public class PhysicalVertex {
}
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
+ this.taskFuture = new CompletableFuture<>();
return new PassiveCompletableFuture<>(this.taskFuture);
}
@@ -229,18 +230,17 @@ public class PhysicalVertex {
this.pluginJarsUrls);
}
- private void turnToEndState(@NonNull ExecutionState endState) {
+ private boolean turnToEndState(@NonNull ExecutionState endState) {
// consistency check
if (executionState.get().isEndState()) {
- String message = "Task is trying to leave terminal state " + executionState.get();
- LOGGER.severe(message);
- throw new IllegalStateException(message);
+ String message = String.format("Task %s is already in terminal state %s", taskFullName, executionState.get());
+ LOGGER.warning(message);
+ return false;
}
-
if (!endState.isEndState()) {
- String message = "Need a end state, not " + endState;
- LOGGER.severe(message);
- throw new IllegalStateException(message);
+ String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
+ LOGGER.warning(message);
+ return false;
}
LOGGER.info(String.format("%s turn to end state %s.",
@@ -248,6 +248,7 @@ public class PhysicalVertex {
endState));
executionState.set(endState);
stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
+ return true;
}
public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
@@ -329,6 +330,22 @@ public class PhysicalVertex {
}
}
+ private void resetExecutionState() {
+ if (!executionState.get().isEndState()) {
+ String message =
+ String.format("%s reset state failed, only end state can be reset, current is %s", getTaskFullName(),
+ executionState.get());
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+ executionState.set(ExecutionState.CREATED);
+ stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+ }
+
+ public void reset() {
+ resetExecutionState();
+ }
+
public AtomicReference<ExecutionState> getExecutionState() {
return executionState;
}
@@ -338,7 +355,9 @@ public class PhysicalVertex {
}
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
- turnToEndState(taskExecutionState.getExecutionState());
+ if (!turnToEndState(taskExecutionState.getExecutionState())) {
+ return;
+ }
if (taskExecutionState.getThrowable() != null) {
LOGGER.severe(String.format("%s end with state %s and Exception: %s",
this.taskFullName,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 7df5074b7..ff7264636 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class SubPlan {
@@ -71,7 +70,7 @@ public class SubPlan {
* Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
* whenComplete method will be called.
*/
- private final CompletableFuture<PipelineState> pipelineFuture;
+ private CompletableFuture<PipelineState> pipelineFuture;
private final ExecutorService executorService;
@@ -111,20 +110,21 @@ public class SubPlan {
addPhysicalVertexCallBack(m.initStateFuture());
});
+ this.pipelineFuture = new CompletableFuture<>();
return new PassiveCompletableFuture<>(pipelineFuture);
}
- private void addPhysicalVertexCallBack(CompletableFuture<TaskExecutionState> future) {
- future.whenComplete((v, t) -> {
+ private void addPhysicalVertexCallBack(PassiveCompletableFuture<TaskExecutionState> future) {
+ future.thenAcceptAsync(executionState -> {
// We need not handle t, Because we will not return t from PhysicalVertex
- if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
+ if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) {
canceledTaskNum.incrementAndGet();
- } else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
+ } else if (ExecutionState.FAILED.equals(executionState.getExecutionState())) {
LOGGER.severe(String.format("Task Failed in %s, Begin to cancel other tasks in this pipeline.",
this.getPipelineFullName()));
failedTaskNum.incrementAndGet();
cancelPipeline();
- } else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) {
+ } else if (!ExecutionState.FINISHED.equals(executionState.getExecutionState())) {
LOGGER.severe(String.format(
"Task Failed in %s, with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.",
this.getPipelineFullName()));
@@ -148,10 +148,6 @@ public class SubPlan {
});
}
- public void whenComplete(BiConsumer<? super PipelineState, ? super Throwable> action) {
- this.pipelineFuture.whenComplete(action);
- }
-
private void turnToEndState(@NonNull PipelineState endState) {
// consistency check
if (pipelineState.get().isEndState()) {
@@ -170,6 +166,18 @@ public class SubPlan {
stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
}
+ private void resetPipelineState() {
+ if (!pipelineState.get().isEndState()) {
+ String message = String.format("%s reset state failed, only end state can be reset, current is %s",
+ getPipelineFullName(), pipelineState.get());
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ pipelineState.set(PipelineState.CREATED);
+ stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+ }
+
public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
// consistency check
if (current.isEndState()) {
@@ -211,23 +219,15 @@ public class SubPlan {
}
public void cancelPipeline() {
- if (!updatePipelineState(PipelineState.CREATED, PipelineState.CANCELED) &&
- !updatePipelineState(PipelineState.SCHEDULED, PipelineState.CANCELED)) {
- // may be deploying, running, failed, canceling , canceled, finished
- if (updatePipelineState(PipelineState.DEPLOYING, PipelineState.CANCELING) ||
- updatePipelineState(PipelineState.RUNNING, PipelineState.CANCELING)) {
- cancelRunningPipeline();
- } else {
- LOGGER.info(
- String.format("%s in a non cancellable state: %s, skip cancel", pipelineFullName,
- pipelineState.get()));
- }
- } else {
- pipelineFuture.complete(PipelineState.CANCELED);
+ if (pipelineState.get().isEndState()) {
+ LOGGER.warning(String.format("%s is in end state %s, can not be cancel", pipelineFullName, pipelineState.get()));
+ return;
}
+ updatePipelineState(pipelineState.get(), PipelineState.CANCELING);
+ cancelPipelineTasks();
}
- private void cancelRunningPipeline() {
+ private void cancelPipelineTasks() {
List<CompletableFuture<Void>> coordinatorCancelList =
coordinatorVertexList.stream().map(coordinator -> cancelTask(coordinator)).filter(x -> x != null)
.collect(Collectors.toList());
@@ -259,9 +259,22 @@ public class SubPlan {
return null;
}
- public void failedWithNoEnoughResource() {
- LOGGER.severe(String.format("%s failed with have no enough resource to run.", this.getPipelineFullName()));
- cancelPipeline();
+ /**
+ * Before restore a pipeline, the pipeline must do reset
+ */
+ public void reset() {
+ resetPipelineState();
+ finishedTaskNum.set(0);
+ canceledTaskNum.set(0);
+ failedTaskNum.set(0);
+
+ coordinatorVertexList.forEach(coordinate -> {
+ coordinate.reset();
+ });
+
+ physicalVertexList.forEach(task -> {
+ task.reset();
+ });
}
public int getPipelineId() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index ca91a15f1..c55bc07fc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -31,11 +31,14 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfigurat
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
+import com.google.common.collect.Lists;
import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
@@ -73,11 +76,15 @@ public class JobMaster implements Runnable {
private JobImmutableInformation jobImmutableInformation;
+ private JobScheduler jobScheduler;
private final Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles;
+ private CompletableFuture<Void> scheduleFuture = new CompletableFuture<>();
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
- @NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager) {
+ @NonNull ExecutorService executorService,
+ @NonNull ResourceManager resourceManager) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
@@ -123,30 +130,53 @@ public class JobMaster implements Runnable {
@Override
public void run() {
try {
+ physicalPlan.setJobMaster(this);
+
PassiveCompletableFuture<JobStatus> jobStatusPassiveCompletableFuture =
physicalPlan.getJobEndCompletableFuture();
- jobStatusPassiveCompletableFuture.whenComplete((v, t) -> {
+ jobStatusPassiveCompletableFuture.thenAcceptAsync(jobStatus -> {
// We need not handle t, Because we will not return t from physicalPlan
- if (JobStatus.FAILING.equals(v)) {
+ if (JobStatus.FAILING.equals(jobStatus)) {
cleanJob();
physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
}
jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
});
- ownedSlotProfiles.putAll(new PipelineBaseScheduler(physicalPlan, this).startScheduling());
+ jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
+ scheduleFuture = CompletableFuture.runAsync(() -> jobScheduler.startScheduling(), executorService);
+ LOGGER.info(String.format("Job %s waiting for scheduler finished", physicalPlan.getJobFullName()));
+ scheduleFuture.join();
+ LOGGER.info(String.format("%s scheduler finished", physicalPlan.getJobFullName()));
} catch (Throwable e) {
LOGGER.severe(String.format("Job %s (%s) run error with: %s",
physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
physicalPlan.getJobImmutableInformation().getJobId(),
ExceptionUtils.getMessage(e)));
// try to cancel job
- physicalPlan.cancelJob();
+ cancelJob();
} finally {
jobMasterCompleteFuture.join();
}
}
+ public void handleCheckpointTimeout(long pipelineId) {
+ this.physicalPlan.getPipelineList().forEach(pipeline -> {
+ if (pipeline.getPipelineId() == pipelineId) {
+ pipeline.cancelPipeline();
+ }
+ });
+ }
+
+ public CompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
+ return jobScheduler.reSchedulerPipeline(subPlan);
+ }
+
+ public void releasePipelineResource(int pipelineId) {
+ resourceManager.releaseResources(jobImmutableInformation.getJobId(),
+ Lists.newArrayList(ownedSlotProfiles.get(pipelineId).values()));
+ }
+
public void cleanJob() {
// TODO Add some job clean operation
}
@@ -154,7 +184,7 @@ public class JobMaster implements Runnable {
public Address queryTaskGroupAddress(long taskGroupId) {
for (Integer pipelineId : ownedSlotProfiles.keySet()) {
Optional<PhysicalVertex> currentVertex = ownedSlotProfiles.get(pipelineId).keySet().stream()
- .filter(physicalVertex -> physicalVertex.getTaskGroup().getTaskGroupLocation().getTaskGroupId() == taskGroupId)
+ .filter(task -> task.getTaskGroupLocation().getTaskGroupId() == taskGroupId)
.findFirst();
if (currentVertex.isPresent()) {
return ownedSlotProfiles.get(pipelineId).get(currentVertex.get()).getWorker();
@@ -164,7 +194,8 @@ public class JobMaster implements Runnable {
}
public void cancelJob() {
- this.physicalPlan.cancelJob();
+ physicalPlan.neverNeedRestore();
+ physicalPlan.cancelJob();
}
public ResourceManager getResourceManager() {
@@ -214,4 +245,17 @@ public class JobMaster implements Runnable {
});
});
}
+
+ public Map<Integer, Map<PhysicalVertex, SlotProfile>> getOwnedSlotProfiles() {
+ return ownedSlotProfiles;
+ }
+
+ public void setOwnedSlotProfiles(@NonNull Integer pipelineId,
+ @NonNull Map<PhysicalVertex, SlotProfile> pipelineOwnedSlotProfiles) {
+ ownedSlotProfiles.put(pipelineId, pipelineOwnedSlotProfiles);
+ }
+
+ public CompletableFuture<Void> getScheduleFuture() {
+ return scheduleFuture;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index 25aece4a4..dbfcc235e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -17,11 +17,14 @@
package org.apache.seatunnel.engine.server.scheduler;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
-import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
-import java.util.Map;
+import lombok.NonNull;
+
+import java.util.concurrent.CompletableFuture;
public interface JobScheduler {
- Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling();
+ CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan);
+
+ void startScheduling();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 1727e06ba..a3d27cd75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -18,20 +18,18 @@
package org.apache.seatunnel.engine.server.scheduler;
import org.apache.seatunnel.engine.common.exception.JobException;
-import org.apache.seatunnel.engine.common.exception.JobNoEnoughResourceException;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineState;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.master.JobMaster;
-import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
-import com.google.common.collect.Lists;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
@@ -41,8 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class PipelineBaseScheduler implements JobScheduler {
@@ -61,84 +57,103 @@ public class PipelineBaseScheduler implements JobScheduler {
}
@Override
- public Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling() {
- Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles = new ConcurrentHashMap<>();
+ public void startScheduling() {
if (physicalPlan.turnToRunning()) {
- List<CompletableFuture<Object>> collect = physicalPlan.getPipelineList().stream().map(pipeline -> {
- if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
- handlePipelineStateUpdateError(pipeline, PipelineState.SCHEDULED);
- return null;
- }
- Map<PhysicalVertex, SlotProfile> slotProfiles;
- try {
- slotProfiles = applyResourceForPipeline(pipeline);
- ownedSlotProfiles.put(pipeline.getPipelineId(), slotProfiles);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- pipeline.whenComplete((state, error) -> {
- releasePipelineResource(Lists.newArrayList(slotProfiles.values()));
- });
- // deploy pipeline
- return CompletableFuture.supplyAsync(() -> {
- // TODO before deploy should check slotProfiles is exist, because it maybe can't use when retry.
- deployPipeline(pipeline, slotProfiles);
- return null;
- });
- }).filter(Objects::nonNull).collect(Collectors.toList());
+ List<CompletableFuture<Void>> collect =
+ physicalPlan.getPipelineList()
+ .stream()
+ .map(pipeline -> schedulerPipeline(pipeline))
+ .filter(Objects::nonNull).collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
collect.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
} catch (Exception e) {
- // cancel pipeline and throw an exception
- physicalPlan.cancelJob();
throw new RuntimeException(e);
}
} else if (!JobStatus.CANCELED.equals(physicalPlan.getJobStatus())) {
throw new JobException(String.format("%s turn to a unexpected state: %s", physicalPlan.getJobFullName(),
physicalPlan.getJobStatus()));
}
- return ownedSlotProfiles;
}
- private void releasePipelineResource(List<SlotProfile> slotProfiles) {
- if (null == slotProfiles || slotProfiles.isEmpty()) {
- return;
+ // This method cannot throw an exception
+ private CompletableFuture<Void> schedulerPipeline(SubPlan pipeline) {
+ try {
+ if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
+ handlePipelineStateTurnError(pipeline, PipelineState.SCHEDULED);
+ return null;
+ }
+
+ Map<PhysicalVertex, SlotProfile> slotProfiles =
+ getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles().get(pipeline.getPipelineId()));
+
+ // To ensure release pipeline resource after new master node active, we need store slotProfiles first and then deploy tasks.
+ jobMaster.setOwnedSlotProfiles(pipeline.getPipelineId(), slotProfiles);
+ // deploy pipeline
+ return CompletableFuture.runAsync(() -> {
+ deployPipeline(pipeline, slotProfiles);
+ });
+ } catch (Exception e) {
+ pipeline.cancelPipeline();
+ return null;
}
- resourceManager.releaseResources(jobId, slotProfiles).join();
}
- private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) throws Exception {
- try {
- Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
- Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
- // TODO If there is no enough resources for tasks, we need add some wait profile
- subPlan.getCoordinatorVertexList().forEach(coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
-
- subPlan.getPhysicalVertexList().forEach(task -> futures.put(task, applyResourceForTask(task)));
-
- for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
- try {
- slotProfiles.put(future.getKey(), future.getValue().get());
- } catch (NoEnoughResourceException e) {
- // TODO custom exception with pipelineID, jobName etc.
- throw new JobNoEnoughResourceException("No enough resource to execute pipeline", e);
- }
+ private Map<PhysicalVertex, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan pipeline,
+ Map<PhysicalVertex, SlotProfile> ownedSlotProfiles) {
+ if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty()) {
+ return applyResourceForPipeline(pipeline);
+ }
+
+ // TODO ensure the slots still exist and is owned by this pipeline
+ for (Map.Entry<PhysicalVertex, SlotProfile> entry : ownedSlotProfiles.entrySet()) {
+ if (entry.getValue() == null) {
+ ownedSlotProfiles.put(entry.getKey(), applyResourceForTask(entry.getKey()).join());
+ } else {
+ entry.getKey().updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
}
- return slotProfiles;
- } catch (JobNoEnoughResourceException | ExecutionException | InterruptedException e) {
- LOGGER.severe(e);
- throw e;
}
+ return ownedSlotProfiles;
+ }
+
+ private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
+ Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+ Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+ // TODO If there is no enough resources for tasks, we need add some wait profile
+ subPlan.getCoordinatorVertexList()
+ .forEach(
+ coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
+
+ subPlan.getPhysicalVertexList()
+ .forEach(task -> futures.put(task, applyResourceForTask(task)));
+
+ for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+ slotProfiles.put(future.getKey(),
+ future.getValue() == null ? null : future.getValue().join());
+ }
+ return slotProfiles;
}
private CompletableFuture<SlotProfile> applyResourceForTask(PhysicalVertex task) {
- if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
- // TODO custom resource size
- return resourceManager.applyResource(jobId, new ResourceProfile());
- } else {
- handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+ try {
+ if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+ // TODO custom resource size
+ return resourceManager.applyResource(jobId, new ResourceProfile());
+ } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
+ ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+ LOGGER.info(
+ String.format("%s be canceled, skip %s this task.", task.getTaskFullName(),
+ ExecutionState.SCHEDULED));
+ return null;
+ } else {
+ makeTaskFailed(task,
+ new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
+ task.getTaskFullName(), task.getExecutionState().get())));
+ return null;
+ }
+ } catch (Throwable e) {
+ makeTaskFailed(task, e);
return null;
}
}
@@ -146,14 +161,23 @@ public class PipelineBaseScheduler implements JobScheduler {
private CompletableFuture<Void> deployTask(PhysicalVertex task, SlotProfile slotProfile) {
if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
// deploy is a time-consuming operation, so we do it async
- return CompletableFuture.supplyAsync(() -> {
+ return CompletableFuture.runAsync(() -> {
task.deploy(slotProfile);
- return null;
});
+ } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
+ ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+ LOGGER.info(
+ String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), ExecutionState.DEPLOYING));
+ return null;
} else {
- handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
+ jobMaster.updateTaskExecutionState(
+ new TaskExecutionState(
+ task.getTaskGroupLocation(),
+ ExecutionState.FAILED,
+ new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
+ task.getTaskFullName(), task.getExecutionState().get()))));
+ return null;
}
- return null;
}
private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
@@ -161,7 +185,8 @@ public class PipelineBaseScheduler implements JobScheduler {
try {
List<CompletableFuture<?>> deployCoordinatorFuture =
- pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
+ pipeline.getCoordinatorVertexList().stream()
+ .map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
.filter(Objects::nonNull).collect(Collectors.toList());
List<CompletableFuture<?>> deployTaskFuture =
@@ -178,35 +203,52 @@ public class PipelineBaseScheduler implements JobScheduler {
pipeline.getPipelineState().get()));
}
} catch (Exception e) {
- // cancel pipeline and throw an exception
- pipeline.cancelPipeline();
- throw new RuntimeException(e);
+ makePipelineFailed(pipeline, e);
}
+ } else if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
+ PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+ // may be canceled
+ LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState().get(), PipelineState.DEPLOYING));
} else {
- handlePipelineStateUpdateError(pipeline, PipelineState.DEPLOYING);
+ makePipelineFailed(pipeline, new JobException(
+ String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState().get())));
}
}
- private void handlePipelineStateUpdateError(SubPlan pipeline, PipelineState targetState) {
+ @Override
+ public CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan) {
+ return schedulerPipeline(subPlan);
+ }
+
+ private void handlePipelineStateTurnError(SubPlan pipeline, PipelineState targetState) {
if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
// may be canceled
- LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState().get(), targetState));
+ LOGGER.info(
+ String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState().get(), targetState));
} else {
throw new JobException(
- String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
+ String.format("%s turn to a unexpected state: %s, stop scheduler job",
+ pipeline.getPipelineFullName(),
pipeline.getPipelineState().get()));
}
}
- private void handleTaskStateUpdateError(PhysicalVertex task, ExecutionState targetState) {
- if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
- ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
- LOGGER.info(String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), targetState));
- } else {
- throw new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
- task.getTaskFullName(), task.getExecutionState().get()));
- }
+ private void makePipelineFailed(@NonNull SubPlan pipeline, Throwable e) {
+ pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+ makeTaskFailed(coordinator, e);
+ });
+
+ pipeline.getPhysicalVertexList().forEach(task -> {
+ makeTaskFailed(task, e);
+ });
+ }
+
+ private void makeTaskFailed(@NonNull PhysicalVertex task, Throwable e) {
+ jobMaster.updateTaskExecutionState(
+ new TaskExecutionState(task.getTaskGroupLocation(), ExecutionState.FAILED, e));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 1c4a6605e..2f6e39ee4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -50,7 +50,7 @@ public class GetTaskGroupAddressOperation extends Operation implements Identifie
response = RetryUtils.retryWithException(() -> server.getJobMaster(taskLocation.getJobId())
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof IllegalArgumentException, Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof Exception, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
new file mode 100644
index 000000000..518f409c3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+
+import com.google.common.collect.Sets;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class TestUtils {
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static LogicalDag getTestLogicalDag() throws MalformedURLException {
+ IdGenerator idGenerator = new IdGenerator();
+ FakeSource fakeSource = new FakeSource();
+ fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+
+ Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource,
+ Sets.newHashSet(new URL("file:///fake.jar")));
+ fake.setParallelism(3);
+ LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
+
+ ConsoleSink consoleSink = new ConsoleSink();
+ consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
+ Sets.newHashSet(new URL("file:///console.jar")));
+ console.setParallelism(3);
+ LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 3);
+
+ LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);
+
+ LogicalDag logicalDag = new LogicalDag();
+ logicalDag.addLogicalVertex(fakeVertex);
+ logicalDag.addLogicalVertex(consoleVertex);
+ logicalDag.addEdge(edge);
+ return logicalDag;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 6baf2d98b..125776a29 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
@@ -49,37 +50,14 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
@Test
public void testTask() throws MalformedURLException {
-
- IdGenerator idGenerator = new IdGenerator();
-
SeaTunnelContext.getContext().setJobMode(JobMode.BATCH);
- FakeSource fakeSource = new FakeSource();
- fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
-
- Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource,
- Sets.newHashSet(new URL("file:///fake.jar")));
- fake.setParallelism(3);
- LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
-
- ConsoleSink consoleSink = new ConsoleSink();
- consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
- Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
- Sets.newHashSet(new URL("file:///console.jar")));
- console.setParallelism(3);
- LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 3);
-
- LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);
-
- LogicalDag logicalDag = new LogicalDag();
- logicalDag.addLogicalVertex(fakeVertex);
- logicalDag.addLogicalVertex(consoleVertex);
- logicalDag.addEdge(edge);
+ LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
JobConfig config = new JobConfig();
config.setName("test");
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
- nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+ nodeEngine.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.submitJob(jobImmutableInformation.getJobId(),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
new file mode 100644
index 000000000..227c347e7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import static org.awaitility.Awaitility.await;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import com.hazelcast.internal.serialization.Data;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * JobMaster Tester.
+ */
+public class JobMasterTest extends AbstractSeaTunnelServerTest {
+ private Long jobId;
+
+ @Before
+ public void before() {
+ super.before();
+ jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ }
+
+ @Test
+ public void testHandleCheckpointTimeout() throws Exception {
+ SeaTunnelContext.getContext().setJobMode(JobMode.STREAMING);
+ LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
+ JobConfig config = new JobConfig();
+ config.setName("test_checkpoint_timeout");
+
+ JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobId,
+ nodeEngine.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
+
+ Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture = server.submitJob(jobId, data);
+ voidPassiveCompletableFuture.join();
+
+ JobMaster jobMaster = server.getJobMaster(jobId);
+
+ // waiting for job status turn to running
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assert.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
+
+ // call checkpoint timeout
+ jobMaster.handleCheckpointTimeout(1);
+
+ // Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job status become running again
+ Thread.sleep(5000);
+
+ // test job still run
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assert.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
+
+ PassiveCompletableFuture<JobStatus> jobMasterCompleteFuture = jobMaster.getJobMasterCompleteFuture();
+ // cancel job
+ jobMaster.cancelJob();
+
+ // test job turn to complete
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assert.assertTrue(
+ jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get())));
+ }
+}