You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/24 08:46:34 UTC

[incubator-seatunnel] branch st-engine updated: Sync pipeline state when restore a pipeline & Callback the CheckpointManager interface when the task is completed (#2813)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new fb5681979 Sync pipeline state when restore a pipeline & Callback the CheckpointManager interface when the task is completed (#2813)
fb5681979 is described below

commit fb5681979cfbfdc08053fb3c413bef58e9f44418
Author: Eric <ga...@gmail.com>
AuthorDate: Sat Sep 24 16:46:28 2022 +0800

    Sync pipeline state when restore a pipeline & Callback the CheckpointManager interface when the task is completed (#2813)
---
 .../engine/server/checkpoint/CheckpointCoordinator.java  |  3 +++
 .../engine/server/checkpoint/CheckpointManager.java      |  1 -
 .../engine/server/dag/physical/PhysicalVertex.java       | 12 +++++++++++-
 .../seatunnel/engine/server/dag/physical/SubPlan.java    | 16 ++++++++++++++++
 .../apache/seatunnel/engine/server/master/JobMaster.java |  1 +
 5 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 10e47c172..6fb96a950 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -414,6 +414,9 @@ public class CheckpointCoordinator {
     }
 
     public boolean isCompleted() {
+        if (latestCompletedCheckpoint == null) {
+            return false;
+        }
         return latestCompletedCheckpoint.getCheckpointType() == AUTO_SAVEPOINT_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index c412cfed4..cde298c41 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -131,7 +131,6 @@ public class CheckpointManager {
         switch (executionState) {
             case FAILED:
             case CANCELED:
-            case CANCELING:
                 coordinatorMap.get(groupLocation.getPipelineId()).cleanPendingCheckpoint();
                 return;
             default:
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index e5ffb3361..afcaf4a3c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
@@ -104,6 +105,8 @@ public class PhysicalVertex {
 
     private TaskGroupImmutableInformation taskGroupImmutableInformation;
 
+    private JobMaster jobMaster;
+
     public PhysicalVertex(int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
                           int parallelism,
@@ -238,6 +241,7 @@ public class PhysicalVertex {
 
     private boolean turnToEndState(@NonNull ExecutionState endState) {
         synchronized (this) {
+            jobMaster.getCheckpointManager().listenTaskGroup(taskGroupLocation, endState);
             // consistency check
             ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
             if (currentState.isEndState()) {
@@ -246,7 +250,9 @@ public class PhysicalVertex {
                 return false;
             }
             if (!endState.isEndState()) {
-                String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
+                String message =
+                    String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName,
+                        endState);
                 LOGGER.warning(message);
                 return false;
             }
@@ -403,4 +409,8 @@ public class PhysicalVertex {
     public TaskGroupLocation getTaskGroupLocation() {
         return taskGroupLocation;
     }
+
+    public void setJobMaster(JobMaster jobMaster) {
+        this.jobMaster = jobMaster;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 824c3f002..d9672c6d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -337,6 +337,10 @@ public class SubPlan {
                 }
                 reset();
                 jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+                if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+                    forcePipelineFinish();
+                    return;
+                }
                 reSchedulerPipelineFuture = jobMaster.reSchedulerPipeline(this);
                 if (reSchedulerPipelineFuture != null) {
                     reSchedulerPipelineFuture.join();
@@ -350,6 +354,16 @@ public class SubPlan {
         }
     }
 
+    /**
+     * If the job state in CheckpointManager is complete, we need force this pipeline finish
+     */
+    private void forcePipelineFinish() {
+        coordinatorVertexList.forEach(coordinator -> coordinator.updateTaskExecutionState(
+            new TaskExecutionState(coordinator.getTaskGroupLocation(), ExecutionState.FINISHED, null)));
+        physicalVertexList.forEach(task -> task.updateTaskExecutionState(
+            new TaskExecutionState(task.getTaskGroupLocation(), ExecutionState.FINISHED, null)));
+    }
+
     /**
      * restore the pipeline state after new Master Node active
      */
@@ -384,6 +398,8 @@ public class SubPlan {
 
     public void setJobMaster(JobMaster jobMaster) {
         this.jobMaster = jobMaster;
+        coordinatorVertexList.forEach(coordinator -> coordinator.setJobMaster(jobMaster));
+        physicalVertexList.forEach(task -> task.setJobMaster(jobMaster));
     }
 
     public int getPipelineRestoreNum() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index bbfd5ee9f..cd593c112 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -184,6 +184,7 @@ public class JobMaster extends Thread {
     public void handleCheckpointTimeout(long pipelineId) {
         this.physicalPlan.getPipelineList().forEach(pipeline -> {
             if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
+                LOGGER.warning(String.format("%s checkpoint timeout, cancel the pipeline", pipeline.getPipelineFullName()));
                 pipeline.cancelPipeline();
             }
         });