You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/24 08:46:34 UTC
[incubator-seatunnel] branch st-engine updated: Sync pipeline state when restore a pipeline & Callback the CheckpointManager interface when the task is completed (#2813)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new fb5681979 Sync pipeline state when restore a pipeline & Callback the CheckpointManager interface when the task is completed (#2813)
fb5681979 is described below
commit fb5681979cfbfdc08053fb3c413bef58e9f44418
Author: Eric <ga...@gmail.com>
AuthorDate: Sat Sep 24 16:46:28 2022 +0800
Sync pipeline state when restore a pipeline & Callback the CheckpointManager interface when the task is completed (#2813)
---
.../engine/server/checkpoint/CheckpointCoordinator.java | 3 +++
.../engine/server/checkpoint/CheckpointManager.java | 1 -
.../engine/server/dag/physical/PhysicalVertex.java | 12 +++++++++++-
.../seatunnel/engine/server/dag/physical/SubPlan.java | 16 ++++++++++++++++
.../apache/seatunnel/engine/server/master/JobMaster.java | 1 +
5 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 10e47c172..6fb96a950 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -414,6 +414,9 @@ public class CheckpointCoordinator {
}
public boolean isCompleted() {
+ if (latestCompletedCheckpoint == null) {
+ return false;
+ }
return latestCompletedCheckpoint.getCheckpointType() == AUTO_SAVEPOINT_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index c412cfed4..cde298c41 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -131,7 +131,6 @@ public class CheckpointManager {
switch (executionState) {
case FAILED:
case CANCELED:
- case CANCELING:
coordinatorMap.get(groupLocation.getPipelineId()).cleanPendingCheckpoint();
return;
default:
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index e5ffb3361..afcaf4a3c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
@@ -104,6 +105,8 @@ public class PhysicalVertex {
private TaskGroupImmutableInformation taskGroupImmutableInformation;
+ private JobMaster jobMaster;
+
public PhysicalVertex(int subTaskGroupIndex,
@NonNull ExecutorService executorService,
int parallelism,
@@ -238,6 +241,7 @@ public class PhysicalVertex {
private boolean turnToEndState(@NonNull ExecutionState endState) {
synchronized (this) {
+ jobMaster.getCheckpointManager().listenTaskGroup(taskGroupLocation, endState);
// consistency check
ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
if (currentState.isEndState()) {
@@ -246,7 +250,9 @@ public class PhysicalVertex {
return false;
}
if (!endState.isEndState()) {
- String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
+ String message =
+ String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName,
+ endState);
LOGGER.warning(message);
return false;
}
@@ -403,4 +409,8 @@ public class PhysicalVertex {
public TaskGroupLocation getTaskGroupLocation() {
return taskGroupLocation;
}
+
+ public void setJobMaster(JobMaster jobMaster) {
+ this.jobMaster = jobMaster;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 824c3f002..d9672c6d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -337,6 +337,10 @@ public class SubPlan {
}
reset();
jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+ if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+ forcePipelineFinish();
+ return;
+ }
reSchedulerPipelineFuture = jobMaster.reSchedulerPipeline(this);
if (reSchedulerPipelineFuture != null) {
reSchedulerPipelineFuture.join();
@@ -350,6 +354,16 @@ public class SubPlan {
}
}
+ /**
+ * If the job state in CheckpointManager is complete, we need force this pipeline finish
+ */
+ private void forcePipelineFinish() {
+ coordinatorVertexList.forEach(coordinator -> coordinator.updateTaskExecutionState(
+ new TaskExecutionState(coordinator.getTaskGroupLocation(), ExecutionState.FINISHED, null)));
+ physicalVertexList.forEach(task -> task.updateTaskExecutionState(
+ new TaskExecutionState(task.getTaskGroupLocation(), ExecutionState.FINISHED, null)));
+ }
+
/**
* restore the pipeline state after new Master Node active
*/
@@ -384,6 +398,8 @@ public class SubPlan {
public void setJobMaster(JobMaster jobMaster) {
this.jobMaster = jobMaster;
+ coordinatorVertexList.forEach(coordinator -> coordinator.setJobMaster(jobMaster));
+ physicalVertexList.forEach(task -> task.setJobMaster(jobMaster));
}
public int getPipelineRestoreNum() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index bbfd5ee9f..cd593c112 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -184,6 +184,7 @@ public class JobMaster extends Thread {
public void handleCheckpointTimeout(long pipelineId) {
this.physicalPlan.getPipelineList().forEach(pipeline -> {
if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
+ LOGGER.warning(String.format("%s checkpoint timeout, cancel the pipeline", pipeline.getPipelineFullName()));
pipeline.cancelPipeline();
}
});