You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/01/13 06:49:04 UTC

[incubator-seatunnel] branch dev updated: [feature][ST-Engine] add savepoint and restore with savepoint (#3930)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f0346c769 [feature][ST-Engine] add savepoint and restore with savepoint (#3930)
f0346c769 is described below

commit f0346c7694e5abd0cf4a9d6d7040cae7a89a5d24
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri Jan 13 14:48:58 2023 +0800

    [feature][ST-Engine] add savepoint and restore with savepoint (#3930)
---
 docs/en/seatunnel-engine/savepoint.md              |  20 ++++
 release-note.md                                    |   1 +
 .../starter/seatunnel/args/ClientCommandArgs.java  |  16 +++
 .../seatunnel/command/ClientExecuteCommand.java    |  15 ++-
 .../seatunnel/engine/client/SeaTunnelClient.java   |  14 +++
 .../engine/client/SeaTunnelClientInstance.java     |   2 +
 .../engine/client/job/JobExecutionEnvironment.java |  25 +++-
 .../engine/client/SeaTunnelClientTest.java         |  33 ++++++
 .../engine/core/job/JobImmutableInformation.java   |  15 ++-
 .../protocol/codec/SeaTunnelSavePointJobCodec.java |  73 ++++++++++++
 .../SeaTunnelEngine.yaml                           |  16 ++-
 .../engine/server/CoordinatorService.java          |  13 +++
 .../engine/server/TaskExecutionService.java        |   3 +-
 .../server/checkpoint/CheckpointCoordinator.java   |  12 +-
 .../server/checkpoint/CheckpointManager.java       |  18 ++-
 .../seatunnel/engine/server/master/JobMaster.java  |  11 ++
 .../server/operation/SavePointJobOperation.java    |  43 +++++++
 .../server/protocol/task/SavePointJobTask.java     |  49 ++++++++
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |   3 +
 .../ClientToServerOperationDataSerializerHook.java |   5 +
 .../engine/server/AbstractSeaTunnelServerTest.java |   8 ++
 .../server/checkpoint/CheckpointManagerTest.java   |   1 +
 .../engine/server/checkpoint/SavePointTest.java    | 128 +++++++++++++++++++++
 .../stream_fakesource_to_file_savepoint.conf       |  65 +++++++++++
 24 files changed, 576 insertions(+), 13 deletions(-)

diff --git a/docs/en/seatunnel-engine/savepoint.md b/docs/en/seatunnel-engine/savepoint.md
new file mode 100644
index 000000000..6eb1ee004
--- /dev/null
+++ b/docs/en/seatunnel-engine/savepoint.md
@@ -0,0 +1,20 @@
+---
+sidebar_position: 5
+---
+
+# savepoint and restore with savepoint
+savepoint is created using the checkpoint. a global mirror of job execution status, which can be used for job or seatunnel stop and recovery, upgrade, etc.
+
+## use savepoint
+To use savepoint, you need to ensure that the connector used by the job supports checkpoint, otherwise data may be lost or duplicated.  
+
+1. Make sure the job is running  
+
+2. Use the following command to trigger savepoint:  
+   ```./bin/seatunnel.sh -s {jobId}```   
+
+After successful execution, the checkpoint data will be saved and the task will end.
+
+## use restore with savepoint
+Resume from savepoint using jobId  
+   ```./bin/seatunnel.sh -c {jobConfig} -r {jobId}```   
\ No newline at end of file
diff --git a/release-note.md b/release-note.md
index a9ffc530c..8e237b609 100644
--- a/release-note.md
+++ b/release-note.md
@@ -39,6 +39,7 @@
 
 ### Zeta Engine
 - [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
+- [Checkpoint] Add savepoint and restore with savepoint #3930
 - [Core]Fix Local Mode can't deserialize split (#3817)
 - [Metrics] Fix Metrics will lose when Job be canceled. #3797
 
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 4455d8ecc..82046db0e 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -37,6 +37,14 @@ public class ClientCommandArgs extends AbstractCommandArgs {
         converter = SeaTunnelMasterTargetConverter.class)
     private MasterType masterType = MasterType.CLUSTER;
 
+    @Parameter(names = {"-r", "--restore"},
+        description = "restore with savepoint by jobId")
+    private String restoreJobId;
+
+    @Parameter(names = {"-s", "--savepoint"},
+        description = "savepoint job by jobId")
+    private String savePointJobId;
+
     @Parameter(names = {"-cn", "--cluster"},
         description = "The name of cluster")
     private String clusterName = "seatunnel_default_cluster";
@@ -103,6 +111,14 @@ public class ClientCommandArgs extends AbstractCommandArgs {
         return metricsJobId;
     }
 
+    public String getRestoreJobId(){
+        return restoreJobId;
+    }
+
+    public String getSavePointJobId(){
+        return savePointJobId;
+    }
+
     public void setMetricsJobId(String metricsJobId) {
         this.metricsJobId = metricsJobId;
     }
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 0fbd3ee5e..ede92ef3c 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -92,13 +92,22 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
             } else if (null != clientCommandArgs.getMetricsJobId()) {
                 String jobMetrics = engineClient.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
                 System.out.println(jobMetrics);
-            } else {
+            } else if (null != clientCommandArgs.getSavePointJobId()){
+                engineClient.savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
+            }
+            else {
                 Path configFile = FileUtils.getConfigPath(clientCommandArgs);
                 checkConfigExist(configFile);
                 JobConfig jobConfig = new JobConfig();
+                JobExecutionEnvironment jobExecutionEnv;
                 jobConfig.setName(clientCommandArgs.getJobName());
-                JobExecutionEnvironment jobExecutionEnv =
-                    engineClient.createExecutionContext(configFile.toString(), jobConfig);
+                if (null != clientCommandArgs.getRestoreJobId()) {
+                    jobExecutionEnv = engineClient.restoreExecutionContext(configFile.toString(), jobConfig,
+                        Long.parseLong(clientCommandArgs.getRestoreJobId()));
+                } else {
+                    jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
+                }
+
                 // get job start time
                 startTime = LocalDateTime.now();
                 // create job proxy
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 7ed4ab949..a114a6037 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCod
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -52,6 +53,12 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
         return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
     }
 
+    @Override
+    public JobExecutionEnvironment restoreExecutionContext(@NonNull String filePath, @NonNull JobConfig jobConfig,
+                                                           @NonNull Long jobId) {
+        return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId);
+    }
+
     @Override
     public JobClient createJobClient() {
         return new JobClient(hazelcastClient);
@@ -119,6 +126,13 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
         );
     }
 
+    public void savePointJob(Long jobId){
+        PassiveCompletableFuture<Void> cancelFuture = hazelcastClient.requestOnMasterAndGetCompletableFuture(
+            SeaTunnelSavePointJobCodec.encodeRequest(jobId));
+
+        cancelFuture.join();
+    }
+
     public void cancelJob(Long jobId) {
         PassiveCompletableFuture<Void> cancelFuture = hazelcastClient.requestOnMasterAndGetCompletableFuture(
             SeaTunnelCancelJobCodec.encodeRequest(jobId));
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 3e52d5d16..ba8e7f798 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -27,6 +27,8 @@ public interface SeaTunnelClientInstance {
 
     JobExecutionEnvironment createExecutionContext(@NonNull String filePath, @NonNull JobConfig config);
 
+    JobExecutionEnvironment restoreExecutionContext(@NonNull String filePath, @NonNull JobConfig config, @NonNull Long jobId);
+
     JobClient createJobClient();
 
     void close();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index d2bd4fd88..11ebbe631 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -52,6 +52,8 @@ public class JobExecutionEnvironment {
 
     private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class);
 
+    private final boolean isStartWithSavePoint;
+
     private final JobConfig jobConfig;
 
     private final int maxParallelism = 1;
@@ -70,15 +72,21 @@ public class JobExecutionEnvironment {
 
     private final JobClient jobClient;
 
+    /**
+     * If the JobId is not empty, it is used to restore job from savePoint
+     */
     public JobExecutionEnvironment(JobConfig jobConfig,
                                    String jobFilePath,
-                                   SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
+                                   SeaTunnelHazelcastClient seaTunnelHazelcastClient,
+                                   boolean isStartWithSavePoint,
+                                   Long jobId) {
         this.jobConfig = jobConfig;
         this.jobFilePath = jobFilePath;
         this.idGenerator = new IdGenerator();
         this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
         this.jobClient = new JobClient(seaTunnelHazelcastClient);
-        this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
+        this.isStartWithSavePoint = isStartWithSavePoint;
+        this.jobConfig.setJobContext(new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
         this.commonPluginJars.addAll(searchPluginJars());
         this.commonPluginJars.addAll(new ArrayList<>(Common.getThirdPartyJars(jobConfig.getEnvOptions()
                 .getOrDefault(EnvCommonOptions.JARS.key(), "").toString()).stream().map(Path::toUri)
@@ -93,6 +101,18 @@ public class JobExecutionEnvironment {
         LOGGER.info("add common jar in plugins :" + commonPluginJars);
     }
 
+    public JobExecutionEnvironment(JobConfig jobConfig,
+                                   String jobFilePath,
+                                   SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
+        this(
+            jobConfig,
+            jobFilePath,
+            seaTunnelHazelcastClient,
+            false,
+            null
+        );
+    }
+
     /**
      * Search all jars in SEATUNNEL_HOME/plugins
      */
@@ -118,6 +138,7 @@ public class JobExecutionEnvironment {
     public ClientJobProxy execute() throws ExecutionException, InterruptedException {
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
             Long.parseLong(jobConfig.getJobContext().getJobId()),
+            isStartWithSavePoint,
             seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
             jobConfig,
             new ArrayList<>(jarUrls));
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f9114419a..96f2f97a3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -218,6 +218,39 @@ public class SeaTunnelClientTest {
         }
     }
 
+    @Test
+    public void testSavePointAndRestoreWithSavePoint() throws ExecutionException, InterruptedException {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("streaming_fake_to_console.conf");
+
+        JobExecutionEnvironment jobExecutionEnv = CLIENT.createExecutionContext(filePath, jobConfig);
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+        CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+        long jobId = clientJobProxy.getJobId();
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
+
+        CLIENT.savePointJob(jobId);
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals("FINISHED", CLIENT.getJobStatus(jobId)));
+
+        Thread.sleep(1000);
+        CLIENT.restoreExecutionContext(filePath, jobConfig, jobId).execute();
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
+
+        CLIENT.cancelJob(jobId);
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId)));
+
+    }
+
     @AfterEach
     void tearDown() {
         CLIENT.close();
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
index 577d52e0c..c7ca87ce8 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
@@ -34,6 +34,8 @@ import java.util.List;
 public class JobImmutableInformation implements IdentifiedDataSerializable {
     private long jobId;
 
+    private boolean isStartWithSavePoint;
+
     private long createTime;
 
     private Data logicalDag;
@@ -45,18 +47,27 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
     public JobImmutableInformation() {
     }
 
-    public JobImmutableInformation(long jobId, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
+    public JobImmutableInformation(long jobId, @NonNull boolean isStartWithSavePoint, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
         this.createTime = System.currentTimeMillis();
         this.jobId = jobId;
+        this.isStartWithSavePoint = isStartWithSavePoint;
         this.logicalDag = logicalDag;
         this.jobConfig = jobConfig;
         this.pluginJarsUrls = pluginJarsUrls;
     }
 
+    public JobImmutableInformation(long jobId, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
+        this(jobId, false, logicalDag, jobConfig, pluginJarsUrls);
+    }
+
     public long getJobId() {
         return jobId;
     }
 
+    public boolean isStartWithSavePoint() {
+        return isStartWithSavePoint;
+    }
+
     public long getCreateTime() {
         return createTime;
     }
@@ -86,6 +97,7 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeLong(jobId);
+        out.writeBoolean(isStartWithSavePoint);
         out.writeLong(createTime);
         IOUtil.writeData(out, logicalDag);
         out.writeObject(jobConfig);
@@ -96,6 +108,7 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         jobId = in.readLong();
+        isStartWithSavePoint = in.readBoolean();
         createTime = in.readLong();
         logicalDag = IOUtil.readData(in);
         jobConfig = in.readObject();
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSavePointJobCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSavePointJobCodec.java
new file mode 100644
index 000000000..5814c63d9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSavePointJobCodec.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.*;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;
+
+/*
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+ */
+/**
+ */
+@Generated("b29b3b6c7451e2940ccd4cd386f32e34")
+public final class SeaTunnelSavePointJobCodec {
+    //hex: 0xDE0A00
+    public static final int REQUEST_MESSAGE_TYPE = 14551552;
+    //hex: 0xDE0A01
+    public static final int RESPONSE_MESSAGE_TYPE = 14551553;
+    private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+    private SeaTunnelSavePointJobCodec() {
+    }
+
+    public static ClientMessage encodeRequest(long jobId) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.SavePointJob");
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+        clientMessage.add(initialFrame);
+        return clientMessage;
+    }
+
+    /**
+     */
+    public static long decodeRequest(ClientMessage clientMessage) {
+        ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
+        ClientMessage.Frame initialFrame = iterator.next();
+        return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+    }
+
+    public static ClientMessage encodeResponse() {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+        clientMessage.add(initialFrame);
+
+        return clientMessage;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 3340dae4f..7b809adb4 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -193,4 +193,18 @@ methods:
           type: Data
           nullable: false
           since: 2.0
-          doc: ''
\ No newline at end of file
+          doc: ''
+  - id: 10
+    name: savePointJob
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params:
+        - name: jobId
+          type: long
+          nullable: false
+          since: 2.0
+          doc: ''
+    response: {}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index dfc92ce38..6d31bef31 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -354,6 +354,19 @@ public class CoordinatorService {
         return new PassiveCompletableFuture<>(voidCompletableFuture);
     }
 
+    public PassiveCompletableFuture<Void> savePoint(long jobId){
+        CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
+        if (!runningJobMasterMap.containsKey(jobId)) {
+            Throwable throwable = new Throwable("The jobId: " + jobId + "of savePoint does not exist");
+            logger.warning(throwable);
+            voidCompletableFuture.completeExceptionally(throwable);
+        } else {
+            JobMaster jobMaster = runningJobMasterMap.get(jobId);
+            voidCompletableFuture = jobMaster.savePoint();
+        }
+        return new PassiveCompletableFuture<>(voidCompletableFuture);
+    }
+
     private void onJobDone(JobMaster jobMaster, long jobId){
         // storage job state and metrics to HistoryStorage
         jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 2b3aff44b..aba4180f4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -249,7 +249,8 @@ public class TaskExecutionService implements DynamicMetricsProvider {
         }
         resultFuture.whenComplete(withTryCatch(logger, (r, s) -> {
             logger.info(
-                String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
+                String.format("Task %s complete with state %s", r != null ? r.getTaskGroupLocation() : "null",
+                    r != null ? r.getExecutionState() : "null"));
             notifyTaskStatusToMaster(taskGroup.getTaskGroupLocation(), r);
         }));
         return new PassiveCompletableFuture<>(resultFuture);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 545297292..4d8229377 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
 import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.CHECKPOINT_TYPE;
 import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
+import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.SAVEPOINT_TYPE;
 import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
 import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
 
@@ -329,7 +330,7 @@ public class CheckpointCoordinator {
     }
 
     public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
-        CompletableFuture<PendingCheckpoint> savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), CheckpointType.SAVEPOINT_TYPE);
+        CompletableFuture<PendingCheckpoint> savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
         startTriggerPendingCheckpoint(savepoint);
         return savepoint.join().getCompletableFuture();
     }
@@ -502,7 +503,7 @@ public class CheckpointCoordinator {
         }
 
         pendingCheckpoint.acknowledgeTask(location, ackOperation.getStates(),
-            CheckpointType.SAVEPOINT_TYPE == pendingCheckpoint.getCheckpointType() ?
+            SAVEPOINT_TYPE == pendingCheckpoint.getCheckpointType() ?
                 SubtaskStatus.SAVEPOINT_PREPARE_CLOSE :
                 SubtaskStatus.RUNNING);
     }
@@ -564,4 +565,11 @@ public class CheckpointCoordinator {
         }
         return latestCompletedCheckpoint.getCheckpointType() == COMPLETED_POINT_TYPE;
     }
+
+    public boolean isEndOfSavePoint() {
+        if (latestCompletedCheckpoint == null) {
+            return false;
+        }
+        return latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index eddca4298..a31766b0c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -78,6 +78,7 @@ public class CheckpointManager {
     private final JobMaster jobMaster;
 
     public CheckpointManager(long jobId,
+                             boolean isStartWithSavePoint,
                              NodeEngine nodeEngine,
                              JobMaster jobMaster,
                              Map<Integer, CheckpointPlan> checkpointPlanMap,
@@ -96,9 +97,15 @@ public class CheckpointManager {
                 IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
                 try {
                     idCounter.start();
-                    // TODO: support savepoint
                     PipelineState pipelineState = null;
-                    if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
+                    if (isStartWithSavePoint){
+                        pipelineState = checkpointStorage.getLatestCheckpointByJobIdAndPipelineId(String.valueOf(jobId),
+                                String.valueOf(plan.getPipelineId()));
+                        long checkpointId = pipelineState.getCheckpointId();
+                        idCounter.setCount(checkpointId);
+                        log.info("pipeline({}) start with savePoint on checkPointId({})", plan.getPipelineId(), checkpointId);
+                    }
+                    else if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
                         pipelineState =
                             checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()),
                                 String.valueOf(idCounter.get() - 1));
@@ -198,7 +205,7 @@ public class CheckpointManager {
      * <br> Listen to the {@link JobStatus} of the {@link Job}.
      */
     public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
-        if (jobStatus == JobStatus.FINISHED) {
+        if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED) && !isSavePointEnd()) {
             checkpointStorage.deleteCheckpoint(jobId + "");
         }
         return CompletableFuture.completedFuture(null);
@@ -225,6 +232,11 @@ public class CheckpointManager {
         coordinator.acknowledgeTask(ackOperation);
     }
 
+    private boolean isSavePointEnd() {
+        return coordinatorMap.values().stream().map(CheckpointCoordinator::isEndOfSavePoint)
+            .reduce((v1, v2) -> v1 && v2).orElse(false);
+    }
+
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
         return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
             jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3940e2744..35fbe8531 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
+import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
 import org.apache.seatunnel.engine.server.dag.DAGUtils;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
@@ -177,6 +178,7 @@ public class JobMaster {
         this.physicalPlan.setJobMaster(this);
         this.checkpointManager = new CheckpointManager(
             jobImmutableInformation.getJobId(),
+            jobImmutableInformation.isStartWithSavePoint(),
             nodeEngine,
             this,
             planTuple.f1(),
@@ -380,6 +382,15 @@ public class JobMaster {
         });
     }
 
+    /**
+     * Execute savePoint, which will cause the job to end.
+     */
+    public CompletableFuture<Void> savePoint(){
+        PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
+            checkpointManager.triggerSavepoints();
+        return CompletableFuture.allOf(passiveCompletableFutures);
+    }
+
     public Map<TaskGroupLocation, SlotProfile> getOwnedSlotProfiles(PipelineLocation pipelineLocation) {
         return ownedSlotProfilesIMap.get(pipelineLocation);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SavePointJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SavePointJobOperation.java
new file mode 100644
index 000000000..9b39fd733
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SavePointJobOperation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
+
+public class SavePointJobOperation extends AbstractJobAsyncOperation {
+    public SavePointJobOperation() {
+        super();
+    }
+
+    public SavePointJobOperation(long jobId) {
+        super(jobId);
+    }
+
+    @Override
+    protected PassiveCompletableFuture<?> doRun() throws Exception {
+        SeaTunnelServer service = getService();
+        return service.getCoordinatorService().savePoint(jobId);
+    }
+
+    @Override
+    public int getClassId() {
+        return ClientToServerOperationDataSerializerHook.SAVEPOINT_JOB_OPERATOR;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SavePointJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SavePointJobTask.java
new file mode 100644
index 000000000..a5b17f168
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SavePointJobTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
+import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class SavePointJobTask extends AbstractSeaTunnelMessageTask<Long, Void> {
+    protected SavePointJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+        super(clientMessage, node, connection,
+            SeaTunnelSavePointJobCodec::decodeRequest,
+            x -> SeaTunnelSavePointJobCodec.encodeResponse());
+    }
+
+    @Override
+    protected Operation prepareOperation() {
+        return new SavePointJobOperation(parameters);
+    }
+
+    @Override
+    public String getMethodName() {
+        return "savePointJob";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[0];
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index bed8e64d0..627ea89fa 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCod
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
 
@@ -67,5 +68,7 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
             (clientMessage, connection) -> new GetJobMetricsTask(clientMessage, node, connection));
         factories.put(SeaTunnelGetJobInfoCodec.REQUEST_MESSAGE_TYPE,
             (clientMessage, connection) -> new GetJobInfoTask(clientMessage, node, connection));
+        factories.put(SeaTunnelSavePointJobCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new SavePointJobTask(clientMessage, node, connection));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index be650c531..2ed57972b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
 
@@ -55,6 +56,8 @@ public final class ClientToServerOperationDataSerializerHook implements DataSeri
 
     public static final int GET_JOB_INFO_OPERATION = 7;
 
+    public static final int SAVEPOINT_JOB_OPERATOR = 8;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
@@ -91,6 +94,8 @@ public final class ClientToServerOperationDataSerializerHook implements DataSeri
                     return new GetJobDetailStatusOperation();
                 case GET_JOB_INFO_OPERATION:
                     return new GetJobInfoOperation();
+                case SAVEPOINT_JOB_OPERATOR:
+                    return new SavePointJobOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index af45774a6..e272d539a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -63,4 +63,12 @@ public abstract class AbstractSeaTunnelServerTest<T extends AbstractSeaTunnelSer
             log.error(ExceptionUtils.getMessage(e));
         }
     }
+
+    /**
+     * For tests that require a cluster restart
+     */
+    public void restartServer() {
+        this.after();
+        this.before();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 54a33af7b..bfb1f2125 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -66,6 +66,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
         planMap.put(1, CheckpointPlan.builder().pipelineId(1).build());
         CheckpointManager checkpointManager = new CheckpointManager(
             jobId,
+            false,
             nodeEngine,
             null,
             planMap,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
new file mode 100644
index 000000000..47a7f7624
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static org.awaitility.Awaitility.await;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import com.hazelcast.internal.serialization.Data;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@DisabledOnOs(OS.WINDOWS)
+public class SavePointTest extends AbstractSeaTunnelServerTest {
+    public static String OUT_PATH = "/tmp/hive/warehouse/test3";
+    public static String CONF_PATH = "stream_fakesource_to_file_savepoint.conf";
+    public static long JOB_ID = 823342L;
+
+    @Test
+    @Disabled()
+    public void testSavePoint() throws InterruptedException {
+        savePointAndRestore(false);
+    }
+
+    @Test
+    @Disabled()
+    public void testSavePointOnServerRestart() throws InterruptedException {
+        savePointAndRestore(true);
+    }
+
+    public void savePointAndRestore(boolean needRestart) throws InterruptedException {
+
+        FileUtils.createNewDir(OUT_PATH);
+
+        //1 Start a streaming mode job
+        startJob(JOB_ID, CONF_PATH, false);
+
+        //2 Wait for the job to running and start outputting data
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                Assertions.assertTrue(server.getCoordinatorService().getJobStatus(JOB_ID).equals(JobStatus.RUNNING) &&
+                    FileUtils.getFileLineNumberFromDir(OUT_PATH) > 10);
+            });
+
+        //3 start savePoint
+        server.getCoordinatorService().savePoint(JOB_ID);
+
+        //4 Wait for savePoint to complete
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                Assertions.assertEquals(server.getCoordinatorService().getJobStatus(JOB_ID), JobStatus.FINISHED);
+            });
+
+        Thread.sleep(1000);
+
+        //restart Server
+        if (needRestart){
+            this.restartServer();
+        }
+
+        Thread.sleep(1000);
+
+        //5 Resume from savePoint
+        startJob(JOB_ID, CONF_PATH, true);
+
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                Assertions.assertEquals(server.getCoordinatorService().getJobStatus(JOB_ID), JobStatus.RUNNING);
+            });
+
+        //6 Run long enough to ensure that the data write is complete
+        Thread.sleep(30000);
+
+        server.getCoordinatorService().cancelJob(JOB_ID);
+
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                Assertions.assertEquals(server.getCoordinatorService().getJobStatus(JOB_ID), JobStatus.CANCELED);
+            });
+
+        // 7 Check the final data count
+        Assertions.assertEquals(100, FileUtils.getFileLineNumberFromDir(OUT_PATH));
+
+        Thread.sleep(1000);
+    }
+
+    private void startJob(Long jobid, String path, boolean isStartWithSavePoint){
+        LogicalDag testLogicalDag =
+            TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
+
+        JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobid, isStartWithSavePoint,
+            nodeEngine.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+            Collections.emptyList());
+
+        Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+            server.getCoordinatorService().submitJob(jobid, data);
+        voidPassiveCompletableFuture.join();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file_savepoint.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file_savepoint.conf
new file mode 100644
index 000000000..ced94b31e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file_savepoint.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+       row.num = 100
+       split.num = 5
+       split.read-interval = 3000
+       parallelism = 1
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path="/tmp/hive/warehouse/test3"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+
+  }
+}
\ No newline at end of file