You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/01/13 06:49:04 UTC
[incubator-seatunnel] branch dev updated: [feature][ST-Engine] add savepoint and restore with savepoint (#3930)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f0346c769 [feature][ST-Engine] add savepoint and restore with savepoint (#3930)
f0346c769 is described below
commit f0346c7694e5abd0cf4a9d6d7040cae7a89a5d24
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri Jan 13 14:48:58 2023 +0800
[feature][ST-Engine] add savepoint and restore with savepoint (#3930)
---
docs/en/seatunnel-engine/savepoint.md | 20 ++++
release-note.md | 1 +
.../starter/seatunnel/args/ClientCommandArgs.java | 16 +++
.../seatunnel/command/ClientExecuteCommand.java | 15 ++-
.../seatunnel/engine/client/SeaTunnelClient.java | 14 +++
.../engine/client/SeaTunnelClientInstance.java | 2 +
.../engine/client/job/JobExecutionEnvironment.java | 25 +++-
.../engine/client/SeaTunnelClientTest.java | 33 ++++++
.../engine/core/job/JobImmutableInformation.java | 15 ++-
.../protocol/codec/SeaTunnelSavePointJobCodec.java | 73 ++++++++++++
.../SeaTunnelEngine.yaml | 16 ++-
.../engine/server/CoordinatorService.java | 13 +++
.../engine/server/TaskExecutionService.java | 3 +-
.../server/checkpoint/CheckpointCoordinator.java | 12 +-
.../server/checkpoint/CheckpointManager.java | 18 ++-
.../seatunnel/engine/server/master/JobMaster.java | 11 ++
.../server/operation/SavePointJobOperation.java | 43 +++++++
.../server/protocol/task/SavePointJobTask.java | 49 ++++++++
.../task/SeaTunnelMessageTaskFactoryProvider.java | 3 +
.../ClientToServerOperationDataSerializerHook.java | 5 +
.../engine/server/AbstractSeaTunnelServerTest.java | 8 ++
.../server/checkpoint/CheckpointManagerTest.java | 1 +
.../engine/server/checkpoint/SavePointTest.java | 128 +++++++++++++++++++++
.../stream_fakesource_to_file_savepoint.conf | 65 +++++++++++
24 files changed, 576 insertions(+), 13 deletions(-)
diff --git a/docs/en/seatunnel-engine/savepoint.md b/docs/en/seatunnel-engine/savepoint.md
new file mode 100644
index 000000000..6eb1ee004
--- /dev/null
+++ b/docs/en/seatunnel-engine/savepoint.md
@@ -0,0 +1,20 @@
+---
+sidebar_position: 5
+---
+
+# savepoint and restore with savepoint
+savepoint is created using the checkpoint. a global mirror of job execution status, which can be used for job or seatunnel stop and recovery, upgrade, etc.
+
+## use savepoint
+To use savepoint, you need to ensure that the connector used by the job supports checkpoint, otherwise data may be lost or duplicated.
+
+1. Make sure the job is running
+
+2. Use the following command to trigger savepoint:
+ ```./bin/seatunnel.sh -s {jobId}```
+
+After successful execution, the checkpoint data will be saved and the task will end.
+
+## use restore with savepoint
+Resume from savepoint using jobId
+ ```./bin/seatunnel.sh -c {jobConfig} -r {jobId}```
\ No newline at end of file
diff --git a/release-note.md b/release-note.md
index a9ffc530c..8e237b609 100644
--- a/release-note.md
+++ b/release-note.md
@@ -39,6 +39,7 @@
### Zeta Engine
- [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
+- [Checkpoint] Add savepoint and restore with savepoint #3930
- [Core]Fix Local Mode can't deserialize split (#3817)
- [Metrics] Fix Metrics will lose when Job be canceled. #3797
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 4455d8ecc..82046db0e 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -37,6 +37,14 @@ public class ClientCommandArgs extends AbstractCommandArgs {
converter = SeaTunnelMasterTargetConverter.class)
private MasterType masterType = MasterType.CLUSTER;
+ @Parameter(names = {"-r", "--restore"},
+ description = "restore with savepoint by jobId")
+ private String restoreJobId;
+
+ @Parameter(names = {"-s", "--savepoint"},
+ description = "savepoint job by jobId")
+ private String savePointJobId;
+
@Parameter(names = {"-cn", "--cluster"},
description = "The name of cluster")
private String clusterName = "seatunnel_default_cluster";
@@ -103,6 +111,14 @@ public class ClientCommandArgs extends AbstractCommandArgs {
return metricsJobId;
}
+ public String getRestoreJobId(){
+ return restoreJobId;
+ }
+
+ public String getSavePointJobId(){
+ return savePointJobId;
+ }
+
public void setMetricsJobId(String metricsJobId) {
this.metricsJobId = metricsJobId;
}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 0fbd3ee5e..ede92ef3c 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -92,13 +92,22 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
} else if (null != clientCommandArgs.getMetricsJobId()) {
String jobMetrics = engineClient.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
System.out.println(jobMetrics);
- } else {
+ } else if (null != clientCommandArgs.getSavePointJobId()){
+ engineClient.savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
+ }
+ else {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();
+ JobExecutionEnvironment jobExecutionEnv;
jobConfig.setName(clientCommandArgs.getJobName());
- JobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(configFile.toString(), jobConfig);
+ if (null != clientCommandArgs.getRestoreJobId()) {
+ jobExecutionEnv = engineClient.restoreExecutionContext(configFile.toString(), jobConfig,
+ Long.parseLong(clientCommandArgs.getRestoreJobId()));
+ } else {
+ jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
+ }
+
// get job start time
startTime = LocalDateTime.now();
// create job proxy
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 7ed4ab949..a114a6037 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCod
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -52,6 +53,12 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
}
+ @Override
+ public JobExecutionEnvironment restoreExecutionContext(@NonNull String filePath, @NonNull JobConfig jobConfig,
+ @NonNull Long jobId) {
+ return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId);
+ }
+
@Override
public JobClient createJobClient() {
return new JobClient(hazelcastClient);
@@ -119,6 +126,13 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
);
}
+ public void savePointJob(Long jobId){
+ PassiveCompletableFuture<Void> cancelFuture = hazelcastClient.requestOnMasterAndGetCompletableFuture(
+ SeaTunnelSavePointJobCodec.encodeRequest(jobId));
+
+ cancelFuture.join();
+ }
+
public void cancelJob(Long jobId) {
PassiveCompletableFuture<Void> cancelFuture = hazelcastClient.requestOnMasterAndGetCompletableFuture(
SeaTunnelCancelJobCodec.encodeRequest(jobId));
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 3e52d5d16..ba8e7f798 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -27,6 +27,8 @@ public interface SeaTunnelClientInstance {
JobExecutionEnvironment createExecutionContext(@NonNull String filePath, @NonNull JobConfig config);
+ JobExecutionEnvironment restoreExecutionContext(@NonNull String filePath, @NonNull JobConfig config, @NonNull Long jobId);
+
JobClient createJobClient();
void close();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index d2bd4fd88..11ebbe631 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -52,6 +52,8 @@ public class JobExecutionEnvironment {
private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class);
+ private final boolean isStartWithSavePoint;
+
private final JobConfig jobConfig;
private final int maxParallelism = 1;
@@ -70,15 +72,21 @@ public class JobExecutionEnvironment {
private final JobClient jobClient;
+ /**
+ * If the JobId is not empty, it is used to restore job from savePoint
+ */
public JobExecutionEnvironment(JobConfig jobConfig,
String jobFilePath,
- SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
+ SeaTunnelHazelcastClient seaTunnelHazelcastClient,
+ boolean isStartWithSavePoint,
+ Long jobId) {
this.jobConfig = jobConfig;
this.jobFilePath = jobFilePath;
this.idGenerator = new IdGenerator();
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
- this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
+ this.isStartWithSavePoint = isStartWithSavePoint;
+ this.jobConfig.setJobContext(new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(new ArrayList<>(Common.getThirdPartyJars(jobConfig.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "").toString()).stream().map(Path::toUri)
@@ -93,6 +101,18 @@ public class JobExecutionEnvironment {
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}
+ public JobExecutionEnvironment(JobConfig jobConfig,
+ String jobFilePath,
+ SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
+ this(
+ jobConfig,
+ jobFilePath,
+ seaTunnelHazelcastClient,
+ false,
+ null
+ );
+ }
+
/**
* Search all jars in SEATUNNEL_HOME/plugins
*/
@@ -118,6 +138,7 @@ public class JobExecutionEnvironment {
public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
Long.parseLong(jobConfig.getJobContext().getJobId()),
+ isStartWithSavePoint,
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
jobConfig,
new ArrayList<>(jarUrls));
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f9114419a..96f2f97a3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -218,6 +218,39 @@ public class SeaTunnelClientTest {
}
}
+ @Test
+ public void testSavePointAndRestoreWithSavePoint() throws ExecutionException, InterruptedException {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("streaming_fake_to_console.conf");
+
+ JobExecutionEnvironment jobExecutionEnv = CLIENT.createExecutionContext(filePath, jobConfig);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ long jobId = clientJobProxy.getJobId();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
+
+ CLIENT.savePointJob(jobId);
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals("FINISHED", CLIENT.getJobStatus(jobId)));
+
+ Thread.sleep(1000);
+ CLIENT.restoreExecutionContext(filePath, jobConfig, jobId).execute();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
+
+ CLIENT.cancelJob(jobId);
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId)));
+
+ }
+
@AfterEach
void tearDown() {
CLIENT.close();
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
index 577d52e0c..c7ca87ce8 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
@@ -34,6 +34,8 @@ import java.util.List;
public class JobImmutableInformation implements IdentifiedDataSerializable {
private long jobId;
+ private boolean isStartWithSavePoint;
+
private long createTime;
private Data logicalDag;
@@ -45,18 +47,27 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
public JobImmutableInformation() {
}
- public JobImmutableInformation(long jobId, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
+ public JobImmutableInformation(long jobId, @NonNull boolean isStartWithSavePoint, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
this.createTime = System.currentTimeMillis();
this.jobId = jobId;
+ this.isStartWithSavePoint = isStartWithSavePoint;
this.logicalDag = logicalDag;
this.jobConfig = jobConfig;
this.pluginJarsUrls = pluginJarsUrls;
}
+ public JobImmutableInformation(long jobId, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
+ this(jobId, false, logicalDag, jobConfig, pluginJarsUrls);
+ }
+
public long getJobId() {
return jobId;
}
+ public boolean isStartWithSavePoint() {
+ return isStartWithSavePoint;
+ }
+
public long getCreateTime() {
return createTime;
}
@@ -86,6 +97,7 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeLong(jobId);
+ out.writeBoolean(isStartWithSavePoint);
out.writeLong(createTime);
IOUtil.writeData(out, logicalDag);
out.writeObject(jobConfig);
@@ -96,6 +108,7 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
@Override
public void readData(ObjectDataInput in) throws IOException {
jobId = in.readLong();
+ isStartWithSavePoint = in.readBoolean();
createTime = in.readLong();
logicalDag = IOUtil.readData(in);
jobConfig = in.readObject();
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSavePointJobCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSavePointJobCodec.java
new file mode 100644
index 000000000..5814c63d9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSavePointJobCodec.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.*;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;
+
+/*
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+ */
+/**
+ */
+@Generated("b29b3b6c7451e2940ccd4cd386f32e34")
+public final class SeaTunnelSavePointJobCodec {
+ //hex: 0xDE0A00
+ public static final int REQUEST_MESSAGE_TYPE = 14551552;
+ //hex: 0xDE0A01
+ public static final int RESPONSE_MESSAGE_TYPE = 14551553;
+ private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+ private SeaTunnelSavePointJobCodec() {
+ }
+
+ public static ClientMessage encodeRequest(long jobId) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.SavePointJob");
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+ clientMessage.add(initialFrame);
+ return clientMessage;
+ }
+
+ /**
+ */
+ public static long decodeRequest(ClientMessage clientMessage) {
+ ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
+ ClientMessage.Frame initialFrame = iterator.next();
+ return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+ }
+
+ public static ClientMessage encodeResponse() {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+ clientMessage.add(initialFrame);
+
+ return clientMessage;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 3340dae4f..7b809adb4 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -193,4 +193,18 @@ methods:
type: Data
nullable: false
since: 2.0
- doc: ''
\ No newline at end of file
+ doc: ''
+ - id: 10
+ name: savePointJob
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params:
+ - name: jobId
+ type: long
+ nullable: false
+ since: 2.0
+ doc: ''
+ response: {}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index dfc92ce38..6d31bef31 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -354,6 +354,19 @@ public class CoordinatorService {
return new PassiveCompletableFuture<>(voidCompletableFuture);
}
+ public PassiveCompletableFuture<Void> savePoint(long jobId){
+ CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
+ if (!runningJobMasterMap.containsKey(jobId)) {
+ Throwable throwable = new Throwable("The jobId: " + jobId + "of savePoint does not exist");
+ logger.warning(throwable);
+ voidCompletableFuture.completeExceptionally(throwable);
+ } else {
+ JobMaster jobMaster = runningJobMasterMap.get(jobId);
+ voidCompletableFuture = jobMaster.savePoint();
+ }
+ return new PassiveCompletableFuture<>(voidCompletableFuture);
+ }
+
private void onJobDone(JobMaster jobMaster, long jobId){
// storage job state and metrics to HistoryStorage
jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 2b3aff44b..aba4180f4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -249,7 +249,8 @@ public class TaskExecutionService implements DynamicMetricsProvider {
}
resultFuture.whenComplete(withTryCatch(logger, (r, s) -> {
logger.info(
- String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
+ String.format("Task %s complete with state %s", r != null ? r.getTaskGroupLocation() : "null",
+ r != null ? r.getExecutionState() : "null"));
notifyTaskStatusToMaster(taskGroup.getTaskGroupLocation(), r);
}));
return new PassiveCompletableFuture<>(resultFuture);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 545297292..4d8229377 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.CHECKPOINT_TYPE;
import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
+import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.SAVEPOINT_TYPE;
import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
@@ -329,7 +330,7 @@ public class CheckpointCoordinator {
}
public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
- CompletableFuture<PendingCheckpoint> savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), CheckpointType.SAVEPOINT_TYPE);
+ CompletableFuture<PendingCheckpoint> savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
startTriggerPendingCheckpoint(savepoint);
return savepoint.join().getCompletableFuture();
}
@@ -502,7 +503,7 @@ public class CheckpointCoordinator {
}
pendingCheckpoint.acknowledgeTask(location, ackOperation.getStates(),
- CheckpointType.SAVEPOINT_TYPE == pendingCheckpoint.getCheckpointType() ?
+ SAVEPOINT_TYPE == pendingCheckpoint.getCheckpointType() ?
SubtaskStatus.SAVEPOINT_PREPARE_CLOSE :
SubtaskStatus.RUNNING);
}
@@ -564,4 +565,11 @@ public class CheckpointCoordinator {
}
return latestCompletedCheckpoint.getCheckpointType() == COMPLETED_POINT_TYPE;
}
+
+ public boolean isEndOfSavePoint() {
+ if (latestCompletedCheckpoint == null) {
+ return false;
+ }
+ return latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index eddca4298..a31766b0c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -78,6 +78,7 @@ public class CheckpointManager {
private final JobMaster jobMaster;
public CheckpointManager(long jobId,
+ boolean isStartWithSavePoint,
NodeEngine nodeEngine,
JobMaster jobMaster,
Map<Integer, CheckpointPlan> checkpointPlanMap,
@@ -96,9 +97,15 @@ public class CheckpointManager {
IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
try {
idCounter.start();
- // TODO: support savepoint
PipelineState pipelineState = null;
- if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
+ if (isStartWithSavePoint){
+ pipelineState = checkpointStorage.getLatestCheckpointByJobIdAndPipelineId(String.valueOf(jobId),
+ String.valueOf(plan.getPipelineId()));
+ long checkpointId = pipelineState.getCheckpointId();
+ idCounter.setCount(checkpointId);
+ log.info("pipeline({}) start with savePoint on checkPointId({})", plan.getPipelineId(), checkpointId);
+ }
+ else if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
pipelineState =
checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()),
String.valueOf(idCounter.get() - 1));
@@ -198,7 +205,7 @@ public class CheckpointManager {
* <br> Listen to the {@link JobStatus} of the {@link Job}.
*/
public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
- if (jobStatus == JobStatus.FINISHED) {
+ if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED) && !isSavePointEnd()) {
checkpointStorage.deleteCheckpoint(jobId + "");
}
return CompletableFuture.completedFuture(null);
@@ -225,6 +232,11 @@ public class CheckpointManager {
coordinator.acknowledgeTask(ackOperation);
}
+ private boolean isSavePointEnd() {
+ return coordinatorMap.values().stream().map(CheckpointCoordinator::isEndOfSavePoint)
+ .reduce((v1, v2) -> v1 && v2).orElse(false);
+ }
+
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3940e2744..35fbe8531 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
+import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
import org.apache.seatunnel.engine.server.dag.DAGUtils;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
@@ -177,6 +178,7 @@ public class JobMaster {
this.physicalPlan.setJobMaster(this);
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
+ jobImmutableInformation.isStartWithSavePoint(),
nodeEngine,
this,
planTuple.f1(),
@@ -380,6 +382,15 @@ public class JobMaster {
});
}
+ /**
+ * Execute savePoint, which will cause the job to end.
+ */
+ public CompletableFuture<Void> savePoint(){
+ PassiveCompletableFuture<CompletedCheckpoint>[] passiveCompletableFutures =
+ checkpointManager.triggerSavepoints();
+ return CompletableFuture.allOf(passiveCompletableFutures);
+ }
+
public Map<TaskGroupLocation, SlotProfile> getOwnedSlotProfiles(PipelineLocation pipelineLocation) {
return ownedSlotProfilesIMap.get(pipelineLocation);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SavePointJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SavePointJobOperation.java
new file mode 100644
index 000000000..9b39fd733
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SavePointJobOperation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
+
+public class SavePointJobOperation extends AbstractJobAsyncOperation {
+ public SavePointJobOperation() {
+ super();
+ }
+
+ public SavePointJobOperation(long jobId) {
+ super(jobId);
+ }
+
+ @Override
+ protected PassiveCompletableFuture<?> doRun() throws Exception {
+ SeaTunnelServer service = getService();
+ return service.getCoordinatorService().savePoint(jobId);
+ }
+
+ @Override
+ public int getClassId() {
+ return ClientToServerOperationDataSerializerHook.SAVEPOINT_JOB_OPERATOR;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SavePointJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SavePointJobTask.java
new file mode 100644
index 000000000..a5b17f168
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SavePointJobTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
+import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class SavePointJobTask extends AbstractSeaTunnelMessageTask<Long, Void> {
+ protected SavePointJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+ super(clientMessage, node, connection,
+ SeaTunnelSavePointJobCodec::decodeRequest,
+ x -> SeaTunnelSavePointJobCodec.encodeResponse());
+ }
+
+ @Override
+ protected Operation prepareOperation() {
+ return new SavePointJobOperation(parameters);
+ }
+
+ @Override
+ public String getMethodName() {
+ return "savePointJob";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[0];
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index bed8e64d0..627ea89fa 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCod
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
@@ -67,5 +68,7 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
(clientMessage, connection) -> new GetJobMetricsTask(clientMessage, node, connection));
factories.put(SeaTunnelGetJobInfoCodec.REQUEST_MESSAGE_TYPE,
(clientMessage, connection) -> new GetJobInfoTask(clientMessage, node, connection));
+ factories.put(SeaTunnelSavePointJobCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new SavePointJobTask(clientMessage, node, connection));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index be650c531..2ed57972b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
@@ -55,6 +56,8 @@ public final class ClientToServerOperationDataSerializerHook implements DataSeri
public static final int GET_JOB_INFO_OPERATION = 7;
+ public static final int SAVEPOINT_JOB_OPERATOR = 8;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
@@ -91,6 +94,8 @@ public final class ClientToServerOperationDataSerializerHook implements DataSeri
return new GetJobDetailStatusOperation();
case GET_JOB_INFO_OPERATION:
return new GetJobInfoOperation();
+ case SAVEPOINT_JOB_OPERATOR:
+ return new SavePointJobOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index af45774a6..e272d539a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -63,4 +63,12 @@ public abstract class AbstractSeaTunnelServerTest<T extends AbstractSeaTunnelSer
log.error(ExceptionUtils.getMessage(e));
}
}
+
+ /**
+ * For tests that require a cluster restart
+ */
+ public void restartServer() {
+ this.after();
+ this.before();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 54a33af7b..bfb1f2125 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -66,6 +66,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
planMap.put(1, CheckpointPlan.builder().pipelineId(1).build());
CheckpointManager checkpointManager = new CheckpointManager(
jobId,
+ false,
nodeEngine,
null,
planMap,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
new file mode 100644
index 000000000..47a7f7624
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static org.awaitility.Awaitility.await;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import com.hazelcast.internal.serialization.Data;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@DisabledOnOs(OS.WINDOWS)
+public class SavePointTest extends AbstractSeaTunnelServerTest {
+ public static String OUT_PATH = "/tmp/hive/warehouse/test3";
+ public static String CONF_PATH = "stream_fakesource_to_file_savepoint.conf";
+ public static long JOB_ID = 823342L;
+
+ @Test
+ @Disabled()
+ public void testSavePoint() throws InterruptedException {
+ savePointAndRestore(false);
+ }
+
+ @Test
+ @Disabled()
+ public void testSavePointOnServerRestart() throws InterruptedException {
+ savePointAndRestore(true);
+ }
+
+ public void savePointAndRestore(boolean needRestart) throws InterruptedException {
+
+ FileUtils.createNewDir(OUT_PATH);
+
+ //1 Start a streaming mode job
+ startJob(JOB_ID, CONF_PATH, false);
+
+ //2 Wait for the job to running and start outputting data
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Assertions.assertTrue(server.getCoordinatorService().getJobStatus(JOB_ID).equals(JobStatus.RUNNING) &&
+ FileUtils.getFileLineNumberFromDir(OUT_PATH) > 10);
+ });
+
+ //3 start savePoint
+ server.getCoordinatorService().savePoint(JOB_ID);
+
+ //4 Wait for savePoint to complete
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Assertions.assertEquals(server.getCoordinatorService().getJobStatus(JOB_ID), JobStatus.FINISHED);
+ });
+
+ Thread.sleep(1000);
+
+ //restart Server
+ if (needRestart){
+ this.restartServer();
+ }
+
+ Thread.sleep(1000);
+
+ //5 Resume from savePoint
+ startJob(JOB_ID, CONF_PATH, true);
+
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Assertions.assertEquals(server.getCoordinatorService().getJobStatus(JOB_ID), JobStatus.RUNNING);
+ });
+
+ //6 Run long enough to ensure that the data write is complete
+ Thread.sleep(30000);
+
+ server.getCoordinatorService().cancelJob(JOB_ID);
+
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Assertions.assertEquals(server.getCoordinatorService().getJobStatus(JOB_ID), JobStatus.CANCELED);
+ });
+
+ // 7 Check the final data count
+ Assertions.assertEquals(100, FileUtils.getFileLineNumberFromDir(OUT_PATH));
+
+ Thread.sleep(1000);
+ }
+
+ private void startJob(Long jobid, String path, boolean isStartWithSavePoint){
+ LogicalDag testLogicalDag =
+ TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
+
+ JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobid, isStartWithSavePoint,
+ nodeEngine.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+ Collections.emptyList());
+
+ Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService().submitJob(jobid, data);
+ voidPassiveCompletableFuture.join();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file_savepoint.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file_savepoint.conf
new file mode 100644
index 000000000..ced94b31e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file_savepoint.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ LocalFile {
+ path="/tmp/hive/warehouse/test3"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+
+ }
+}
\ No newline at end of file