You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/23 12:28:08 UTC

[incubator-seatunnel] branch st-engine updated: [engine][checkpoint] checkpoint base classes (#2478)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new a01e591a2 [engine][checkpoint] checkpoint base classes (#2478)
a01e591a2 is described below

commit a01e591a2ad54bee2c9fee1842cf3d3c32d27b01
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Tue Aug 23 20:28:03 2022 +0800

    [engine][checkpoint] checkpoint base classes (#2478)
    
    * [engine] checkpoint base classes
    
    * [engine][checkpoint] add checkpoint operation
    
    * Serialization error
    
    * [engine] fixed checkpoint operation id
    
    * checkstyle
---
 .../seatunnel/engine/server/SeaTunnelServer.java   |   8 ++
 .../server/checkpoint/CheckpointCoordinator.java   |  52 ++++++++
 .../CheckpointCoordinatorConfiguration.java        | 143 +++++++++++++++++++++
 .../server/checkpoint/CheckpointManager.java       |  51 ++++++++
 .../engine/server/checkpoint/CheckpointPlan.java   |  25 ++++
 .../server/checkpoint/CompletedCheckpoint.java     |  72 +++++++++++
 .../server/checkpoint/PendingCheckpoint.java       |  72 +++++++++++
 .../server/checkpoint/SubtaskStatistics.java       |  53 ++++++++
 .../engine/server/checkpoint/TaskStatistics.java   |  81 ++++++++++++
 .../server/dag/physical/PhysicalPlanGenerator.java |   7 +-
 .../{PhysicalPlanUtils.java => PlanUtils.java}     |  16 ++-
 .../engine/server/execution/TaskInfo.java          |  56 ++++++++
 .../seatunnel/engine/server/master/JobMaster.java  |  14 +-
 ...rOperation.java => CheckpointAckOperation.java} |  31 +++--
 ...ation.java => CheckpointFinishedOperation.java} |  31 +++--
 .../operation/CheckpointTriggerOperation.java      |   2 +-
 ...rOperation.java => TaskCompletedOperation.java} |  26 ++--
 .../serializable/OperationDataSerializerHook.java  |  19 ++-
 .../operation/source/SourceRegisterOperation.java  |   2 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |   6 +-
 20 files changed, 719 insertions(+), 48 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 3b0cf8dd6..23f7613e2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -58,6 +58,10 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     private final SeaTunnelConfig seaTunnelConfig;
 
+    /**
+     * key: job id;
+     * <br> value: job master;
+     */
     private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
 
     public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
@@ -73,6 +77,10 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         return this.taskExecutionService;
     }
 
+    public JobMaster getJobMaster(Long jobId) {
+        return runningJobMasterMap.get(jobId);
+    }
+
     @Override
     public void init(NodeEngine engine, Properties hzProperties) {
         this.nodeEngine = (NodeEngineImpl) engine;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
new file mode 100644
index 000000000..6046380f4
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Used to coordinate all checkpoints of a pipeline.
+ * <p>
+ * Generate and coordinate {@link Checkpoint} with a checkpoint plan
+ * </p>
+ */
+public class CheckpointCoordinator {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+
+    private final CheckpointPlan plan;
+
+    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+
+    private final Map<Long, CompletedCheckpoint> completedCheckpoints;
+
+    private final CheckpointCoordinatorConfiguration config;
+
+    public CheckpointCoordinator(CheckpointPlan plan,
+                                 CheckpointCoordinatorConfiguration config) {
+        this.plan = plan;
+        this.config = config;
+        this.pendingCheckpoints = new LinkedHashMap<>();
+        this.completedCheckpoints = new LinkedHashMap<>();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
new file mode 100644
index 000000000..bfb936831
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class CheckpointCoordinatorConfiguration implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public static final long MINIMAL_CHECKPOINT_TIME = 10;
+
+    private final long checkpointInterval;
+
+    private final long checkpointTimeout;
+
+    private final int maxConcurrentCheckpoints;
+
+    private final int tolerableFailureCheckpoints;
+
+    private CheckpointCoordinatorConfiguration(long checkpointInterval,
+                                               long checkpointTimeout,
+                                               int maxConcurrentCheckpoints,
+                                               int tolerableFailureCheckpoints) {
+        this.checkpointInterval = checkpointInterval;
+        this.checkpointTimeout = checkpointTimeout;
+        this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+        this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
+    }
+
+    public long getCheckpointInterval() {
+        return checkpointInterval;
+    }
+
+    public long getCheckpointTimeout() {
+        return checkpointTimeout;
+    }
+
+    public int getMaxConcurrentCheckpoints() {
+        return maxConcurrentCheckpoints;
+    }
+
+    public int getTolerableFailureCheckpoints() {
+        return tolerableFailureCheckpoints;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CheckpointCoordinatorConfiguration that = (CheckpointCoordinatorConfiguration) o;
+        return checkpointInterval == that.checkpointInterval
+            && checkpointTimeout == that.checkpointTimeout
+            && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints
+            && tolerableFailureCheckpoints == that.tolerableFailureCheckpoints;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            checkpointInterval,
+            checkpointTimeout,
+            maxConcurrentCheckpoints,
+            tolerableFailureCheckpoints);
+    }
+
+    @Override
+    public String toString() {
+        return "CheckpointCoordinatorConfiguration{" +
+            "checkpointInterval=" + checkpointInterval +
+            ", checkpointTimeout=" + checkpointTimeout +
+            ", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
+            ", tolerableFailureCheckpoints=" + tolerableFailureCheckpoints +
+            '}';
+    }
+
+    public static CheckpointCoordinatorConfiguration.Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        private long checkpointInterval = MINIMAL_CHECKPOINT_TIME;
+        private long checkpointTimeout = MINIMAL_CHECKPOINT_TIME;
+        private int maxConcurrentCheckpoints = 1;
+        private int tolerableFailureCheckpoints = 0;
+
+        private Builder() {
+        }
+
+        public Builder checkpointInterval(long checkpointInterval) {
+            checkArgument(checkpointInterval < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint interval is 10 mills.");
+            this.checkpointInterval = checkpointInterval;
+            return this;
+        }
+
+        public Builder checkpointTimeout(long checkpointTimeout) {
+            checkArgument(checkpointTimeout < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint timeout is 10 mills.");
+            this.checkpointTimeout = checkpointTimeout;
+            return this;
+        }
+
+        public Builder maxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+            checkArgument(maxConcurrentCheckpoints < 1, "The minimum number of concurrent checkpoints is 1.");
+            this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+            return this;
+        }
+
+        public Builder tolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
+            checkArgument(maxConcurrentCheckpoints < 0, "The number of tolerance failed checkpoints must be a natural number.");
+            this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
+            return this;
+        }
+
+        public CheckpointCoordinatorConfiguration build() {
+            return new CheckpointCoordinatorConfiguration(
+                checkpointInterval,
+                checkpointTimeout,
+                maxConcurrentCheckpoints,
+                tolerableFailureCheckpoints);
+        }
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
new file mode 100644
index 000000000..16da70a8b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to manage all checkpoints for a job.
+ * <p>
+ * Maintain the life cycle of the {@link CheckpointCoordinator} through the {@link CheckpointPlan} and the status of the job.
+ * </p>
+ */
+public class CheckpointManager {
+
+    /**
+     * key: the pipeline id of the job;
+     * <br> value: the checkpoint plan of the pipeline;
+     */
+    private final Map<Long, CheckpointPlan> checkpointPlanMap;
+
+    /**
+     * key: the pipeline id of the job;
+     * <br> value: the checkpoint coordinator of the pipeline;
+     */
+    private final Map<Long, CheckpointCoordinator> coordinatorMap;
+
+    public CheckpointManager(Map<Long, CheckpointPlan> checkpointPlanMap) {
+        this.checkpointPlanMap = checkpointPlanMap;
+        this.coordinatorMap = new HashMap<>(checkpointPlanMap.size());
+    }
+
+    public CheckpointCoordinator getCheckpointCoordinator(long pipelineId) {
+        return coordinatorMap.get(pipelineId);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
new file mode 100644
index 000000000..6a144668d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+/**
+ * plan info
+ */
+public class CheckpointPlan {
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
new file mode 100644
index 000000000..3b56f5129
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class CompletedCheckpoint implements Checkpoint, Serializable {
+    private static final long serialVersionUID = 1L;
+    private final long jobId;
+
+    private final long pipelineId;
+
+    private final long checkpointId;
+
+    private final long triggerTimestamp;
+
+    private final long completedTimestamp;
+
+    private final Map<Long, TaskState> taskStates;
+
+    public CompletedCheckpoint(long jobId,
+                             long pipelineId,
+                             long checkpointId,
+                             long triggerTimestamp,
+                             long completedTimestamp,
+                             Map<Long, TaskState> taskStates) {
+        this.jobId = jobId;
+        this.pipelineId = pipelineId;
+        this.checkpointId = checkpointId;
+        this.triggerTimestamp = triggerTimestamp;
+        this.completedTimestamp = completedTimestamp;
+        this.taskStates = taskStates;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return this.checkpointId;
+    }
+
+    @Override
+    public long getPipelineId() {
+        return this.pipelineId;
+    }
+
+    @Override
+    public long getJobId() {
+        return this.jobId;
+    }
+
+    @Override
+    public long getCheckpointTimestamp() {
+        return this.triggerTimestamp;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
new file mode 100644
index 000000000..1c999d107
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
+
+import java.util.Map;
+import java.util.Set;
+
+public class PendingCheckpoint implements Checkpoint {
+
+    private final long jobId;
+
+    private final long pipelineId;
+
+    private final long checkpointId;
+
+    private final long triggerTimestamp;
+
+    private final Set<Long> notYetAcknowledgedTasks;
+
+    private final Map<Long, TaskState> taskStates;
+
+    public PendingCheckpoint(long jobId,
+                             long pipelineId,
+                             long checkpointId,
+                             long triggerTimestamp,
+                             Set<Long> notYetAcknowledgedTasks,
+                             Map<Long, TaskState> taskStates) {
+        this.jobId = jobId;
+        this.pipelineId = pipelineId;
+        this.checkpointId = checkpointId;
+        this.triggerTimestamp = triggerTimestamp;
+        this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+        this.taskStates = taskStates;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return this.checkpointId;
+    }
+
+    @Override
+    public long getPipelineId() {
+        return this.pipelineId;
+    }
+
+    @Override
+    public long getJobId() {
+        return this.jobId;
+    }
+
+    @Override
+    public long getCheckpointTimestamp() {
+        return this.triggerTimestamp;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
new file mode 100644
index 000000000..d098c1783
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import java.io.Serializable;
+
+public class SubtaskStatistics implements Serializable {
+
+    private static final long serialVersionUID = 8928594531621862214L;
+
+    private final int subtaskIndex;
+
+    /** Timestamp when the ack from this subtask was received at the coordinator. */
+    private final long ackTimestamp;
+
+    /** Size of the checkpointed state at this subtask. */
+    private final long stateSize;
+
+    public SubtaskStatistics(int subtaskIndex,
+                             long ackTimestamp,
+                             long stateSize) {
+        this.subtaskIndex = subtaskIndex;
+        this.ackTimestamp = ackTimestamp;
+        this.stateSize = stateSize;
+    }
+
+    public int getSubtaskIndex() {
+        return subtaskIndex;
+    }
+
+    public long getAckTimestamp() {
+        return ackTimestamp;
+    }
+
+    public long getStateSize() {
+        return stateSize;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
new file mode 100644
index 000000000..4956c9d1d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class TaskStatistics {
+    /** ID of the task the statistics belong to. */
+    private final Long jobVertexId;
+
+    private final SubtaskStatistics[] subtaskStats;
+
+    private int numAcknowledgedSubtasks;
+
+    private SubtaskStatistics latestAckedSubtaskStatistics;
+
+    TaskStatistics(Long jobVertexId, int parallelism) {
+        this.jobVertexId = checkNotNull(jobVertexId, "JobVertexID");
+        checkArgument(parallelism > 0, "the parallelism of task <= 0");
+        this.subtaskStats = new SubtaskStatistics[parallelism];
+    }
+
+    boolean reportSubtaskStats(SubtaskStatistics subtask) {
+        checkNotNull(subtask, "Subtask stats");
+        int subtaskIndex = subtask.getSubtaskIndex();
+
+        if (subtaskIndex < 0 || subtaskIndex >= subtaskStats.length) {
+            return false;
+        }
+
+        if (subtaskStats[subtaskIndex] == null) {
+            subtaskStats[subtaskIndex] = subtask;
+            numAcknowledgedSubtasks++;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * @return The latest acknowledged subtask stats or <code>null</code> if none was acknowledged
+     *     yet.
+     */
+    public SubtaskStatistics getLatestAcknowledgedSubtaskStatistics() {
+        return latestAckedSubtaskStatistics;
+    }
+
+    /**
+     * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code> if none was
+     *     acknowledged yet..
+     */
+    public long getLatestAckTimestamp() {
+        return latestAckedSubtaskStatistics != null ?
+            latestAckedSubtaskStatistics.getAckTimestamp() :
+            -1;
+    }
+
+    public Long getJobVertexId() {
+        return jobVertexId;
+    }
+
+    public SubtaskStatistics[] getSubtaskStats() {
+        return subtaskStats;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index a2b71303d..0f5dcdb52 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
 import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
@@ -52,6 +53,7 @@ import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
 
 import com.google.common.collect.Lists;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.jet.datamodel.Tuple2;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
@@ -111,7 +113,7 @@ public class PhysicalPlanGenerator {
         this.flakeIdGenerator = flakeIdGenerator;
     }
 
-    public PhysicalPlan generate() {
+    public Tuple2<PhysicalPlan, CheckpointPlan> generate() {
 
         // TODO Determine which tasks do not need to be restored according to state
         CopyOnWriteArrayList<PassiveCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
@@ -151,11 +153,12 @@ public class PhysicalPlanGenerator {
                 jobImmutableInformation);
         });
 
-        return new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
+        PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
             executorService,
             jobImmutableInformation,
             initializationTimestamp,
             waitForCompleteBySubPlanList.toArray(new PassiveCompletableFuture[waitForCompleteBySubPlanList.size()]));
+        return Tuple2.tuple2(physicalPlan, null);
     }
 
     private List<SourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
similarity index 66%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index 1acc21671..fcc24c6f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -19,22 +19,24 @@ package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
 
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.jet.datamodel.Tuple2;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
 import java.util.concurrent.ExecutorService;
 
-public class PhysicalPlanUtils {
+public class PlanUtils {
 
-    public static PhysicalPlan fromLogicalDAG(@NonNull LogicalDag logicalDag,
-                                              @NonNull NodeEngine nodeEngine,
-                                              @NonNull JobImmutableInformation jobImmutableInformation,
-                                              long initializationTimestamp,
-                                              @NonNull ExecutorService executorService,
-                                              @NonNull FlakeIdGenerator flakeIdGenerator) {
+    public static Tuple2<PhysicalPlan, CheckpointPlan> fromLogicalDAG(@NonNull LogicalDag logicalDag,
+                                                                      @NonNull NodeEngine nodeEngine,
+                                                                      @NonNull JobImmutableInformation jobImmutableInformation,
+                                                                      long initializationTimestamp,
+                                                                      @NonNull ExecutorService executorService,
+                                                                      @NonNull FlakeIdGenerator flakeIdGenerator) {
         return new PhysicalPlanGenerator(
                 new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, initializationTimestamp).generate(),
                 nodeEngine,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskInfo.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskInfo.java
new file mode 100644
index 000000000..fdec3ad8f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import java.io.Serializable;
+
+public class TaskInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Long jobId;
+
+    private final Long pipelineId;
+
+    private final Long jobVertexId;
+
+    private final Integer index;
+
+    public TaskInfo(Long jobId, Long pipelineId, Long jobVertexId, Integer index) {
+        this.jobId = jobId;
+        this.pipelineId = pipelineId;
+        this.jobVertexId = jobVertexId;
+        this.index = index;
+    }
+
+    public Long getJobId() {
+        return jobId;
+    }
+
+    public Long getPipelineId() {
+        return pipelineId;
+    }
+
+    public Long getJobVertexId() {
+        return jobVertexId;
+    }
+
+    public Integer getIndex() {
+        return index;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 1cdc0b5f7..3af7ebe12 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -23,8 +23,10 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.SimpleResourceManager;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -32,6 +34,7 @@ import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
 
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.jet.datamodel.Tuple2;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngine;
@@ -55,6 +58,8 @@ public class JobMaster implements Runnable {
 
     private ResourceManager resourceManager;
 
+    private CheckpointManager checkpointManager;
+
     private CompletableFuture<JobStatus> jobMasterCompleteFuture = new CompletableFuture<>();
 
     private JobImmutableInformation jobImmutableInformation;
@@ -80,12 +85,13 @@ public class JobMaster implements Runnable {
 
         // TODO Use classloader load the connector jars and deserialize logicalDag
         this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
-        physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
+        final Tuple2<PhysicalPlan, CheckpointPlan> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
             nodeEngine,
             jobImmutableInformation,
             System.currentTimeMillis(),
             executorService,
             flakeIdGenerator);
+        physicalPlan = planTuple.f0();
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -126,6 +132,10 @@ public class JobMaster implements Runnable {
         return resourceManager;
     }
 
+    public CheckpointManager getCheckpointManager() {
+        return checkpointManager;
+    }
+
     public PassiveCompletableFuture<JobStatus> getJobMasterCompleteFuture() {
         return new PassiveCompletableFuture<>(jobMasterCompleteFuture);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointAckOperation.java
similarity index 60%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointAckOperation.java
index 6de6b3f75..6947b2fad 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointAckOperation.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
+import org.apache.seatunnel.engine.server.execution.TaskInfo;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -26,34 +28,43 @@ import com.hazelcast.nio.ObjectDataOutput;
 
 import java.io.IOException;
 
-public class CheckpointTriggerOperation extends AsyncOperation {
-    private CheckpointBarrier checkpointBarrier;
+public class CheckpointAckOperation extends AsyncOperation {
+    private TaskInfo taskInfo;
 
-    public CheckpointTriggerOperation() {
+    private byte[] states;
+
+    public CheckpointAckOperation() {
     }
 
-    public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
-        this.checkpointBarrier = checkpointBarrier;
+    public CheckpointAckOperation(TaskInfo taskInfo, byte[] states) {
+        this.taskInfo = taskInfo;
+        this.states = states;
     }
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+        return OperationDataSerializerHook.CHECKPOINT_ACK_OPERATOR;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeObject(checkpointBarrier);
+        out.writeObject(taskInfo);
+        out.writeByteArray(states);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        checkpointBarrier = in.readObject(CheckpointBarrier.class);
+        taskInfo = in.readObject(TaskInfo.class);
+        states = in.readByteArray();
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
-        // TODO: All source Vertexes executed
+        CheckpointCoordinator checkpointCoordinator = ((SeaTunnelServer) getService())
+            .getJobMaster(taskInfo.getJobId())
+            .getCheckpointManager()
+            .getCheckpointCoordinator(taskInfo.getPipelineId());
+        // TODO: notify coordinator
         return null;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointFinishedOperation.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointFinishedOperation.java
index 6de6b3f75..2d061482e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointFinishedOperation.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -26,34 +25,46 @@ import com.hazelcast.nio.ObjectDataOutput;
 
 import java.io.IOException;
 
-public class CheckpointTriggerOperation extends AsyncOperation {
-    private CheckpointBarrier checkpointBarrier;
+public class CheckpointFinishedOperation extends AsyncOperation {
+    private long jobId;
 
-    public CheckpointTriggerOperation() {
+    private long pipelineId;
+
+    private long checkpointId;
+
+    public CheckpointFinishedOperation() {
     }
 
-    public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
-        this.checkpointBarrier = checkpointBarrier;
+    public CheckpointFinishedOperation(long jobId,
+                                       long pipelineId,
+                                       long checkpointId) {
+        this.jobId = jobId;
+        this.pipelineId = pipelineId;
+        this.checkpointId = checkpointId;
     }
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+        return OperationDataSerializerHook.CHECKPOINT_FINISHED_OPERATOR;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeObject(checkpointBarrier);
+        out.writeLong(jobId);
+        out.writeLong(pipelineId);
+        out.writeLong(checkpointId);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        checkpointBarrier = in.readObject(CheckpointBarrier.class);
+        jobId = in.readLong();
+        pipelineId = in.readLong();
+        checkpointId = in.readLong();
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
-        // TODO: All source Vertexes executed
+        // TODO: Notifies all tasks of the pipeline about the status of the checkpoint
         return null;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
index 6de6b3f75..235cd010e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
@@ -38,7 +38,7 @@ public class CheckpointTriggerOperation extends AsyncOperation {
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+        return OperationDataSerializerHook.CHECKPOINT_TRIGGER_OPERATOR;
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/TaskCompletedOperation.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/TaskCompletedOperation.java
index 6de6b3f75..fbca114c1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/TaskCompletedOperation.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.engine.server.operation;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
+import org.apache.seatunnel.engine.server.execution.TaskInfo;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -26,34 +28,38 @@ import com.hazelcast.nio.ObjectDataOutput;
 
 import java.io.IOException;
 
-public class CheckpointTriggerOperation extends AsyncOperation {
-    private CheckpointBarrier checkpointBarrier;
+public class TaskCompletedOperation extends AsyncOperation {
+    private TaskInfo taskInfo;
 
-    public CheckpointTriggerOperation() {
+    public TaskCompletedOperation() {
     }
 
-    public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
-        this.checkpointBarrier = checkpointBarrier;
+    public TaskCompletedOperation(TaskInfo taskInfo) {
+        this.taskInfo = taskInfo;
     }
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+        return OperationDataSerializerHook.TASK_COMPLETED_OPERATOR;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeObject(checkpointBarrier);
+        out.writeObject(taskInfo);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        checkpointBarrier = in.readObject(CheckpointBarrier.class);
+        taskInfo = in.readObject(TaskInfo.class);
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
-        // TODO: All source Vertexes executed
+        CheckpointCoordinator checkpointCoordinator = ((SeaTunnelServer) getService())
+            .getJobMaster(taskInfo.getJobId())
+            .getCheckpointManager()
+            .getCheckpointCoordinator(taskInfo.getPipelineId());
+        // TODO: notify coordinator
         return null;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index d5ade4d59..2a40f871b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,10 +18,13 @@
 package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.CheckpointAckOperation;
+import org.apache.seatunnel.engine.server.operation.CheckpointFinishedOperation;
 import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
 import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.server.operation.TaskCompletedOperation;
 import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
@@ -40,9 +43,15 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int PRINT_MESSAGE_OPERATOR = 0;
     public static final int SUBMIT_OPERATOR = 1;
     public static final int DEPLOY_TASK_OPERATOR = 2;
-    public static final int WAIT_FORM_JOB_COMPLETE_OPERATOR = 3;
 
-    public static final int CHECKPOINT_TRIGGER_OPERATOR = 4;
+    public static final int TASK_COMPLETED_OPERATOR = 3;
+    public static final int WAIT_FORM_JOB_COMPLETE_OPERATOR = 4;
+
+    public static final int CHECKPOINT_TRIGGER_OPERATOR = 5;
+
+    public static final int CHECKPOINT_ACK_OPERATOR = 6;
+
+    public static final int CHECKPOINT_FINISHED_OPERATOR = 7;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -70,10 +79,16 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
                     return new SubmitJobOperation();
                 case DEPLOY_TASK_OPERATOR:
                     return new DeployTaskOperation();
+                case TASK_COMPLETED_OPERATOR:
+                    return new TaskCompletedOperation();
                 case WAIT_FORM_JOB_COMPLETE_OPERATOR:
                     return new WaitForJobCompleteOperation();
                 case CHECKPOINT_TRIGGER_OPERATOR:
                     return new CheckpointTriggerOperation();
+                case CHECKPOINT_ACK_OPERATOR:
+                    return new CheckpointAckOperation();
+                case CHECKPOINT_FINISHED_OPERATOR:
+                    return new CheckpointFinishedOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index e570aeb0c..bca5615c8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -61,7 +61,7 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
             task.receivedReader(readerTaskID, readerAddress);
             return null;
         }, new RetryUtils.RetryMaterial(RETRY_TIME, true,
-                exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
+            exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
 
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 5c78af388..b5f43ad3b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -36,7 +36,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
@@ -145,11 +145,11 @@ public class TaskTest {
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
                 nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
 
-        PhysicalPlan physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
+        PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
                 jobImmutableInformation,
                 System.currentTimeMillis(),
                 Executors.newCachedThreadPool(),
-                instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME));
+                instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
 
         Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
         Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);