You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/17 12:24:19 UTC

[incubator-seatunnel] branch dev updated: [Engine] [ConfigFile] Add Config for Slot Service and Checkpoint (#3061)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 07c2a11f1 [Engine] [ConfigFile] Add Config for Slot Service and Checkpoint (#3061)
07c2a11f1 is described below

commit 07c2a11f1eccd09aabd39529f138be14c55e7a42
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Oct 17 20:24:13 2022 +0800

    [Engine] [ConfigFile] Add Config for Slot Service and Checkpoint (#3061)
---
 .../seatunnel/engine/client/SeaTunnelClient.java   |   4 +-
 .../engine/client/SeaTunnelHazelcastClient.java    |  23 +---
 .../seatunnel/engine/client/job/JobClient.java     |   2 +-
 .../engine/client/job/JobExecutionEnvironment.java |  16 +--
 .../engine/common/config/EngineConfig.java         |  23 +++-
 .../seatunnel/engine/common/config/JobConfig.java  |   6 +
 .../engine/common/config/SeaTunnelConfig.java      |   2 +-
 .../config/YamlSeaTunnelDomConfigProcessor.java    |  91 +++++++++++--
 .../common/config/server/CheckpointConfig.java     |  59 +++++++++
 .../CheckpointStorageConfig.java}                  |  16 +--
 .../common/config/server/ServerConfigOptions.java  |  56 ++++++++
 .../common/config/server/SlotServiceConfig.java}   |  37 +++---
 .../config/YamlSeaTunnelConfigParserTest.java      |  59 +++++++++
 .../src/test/resources/hazelcast-client.yaml       |  37 ++++++
 .../src/test/resources/hazelcast.yaml              |  37 ++++++
 .../src/test/resources/seatunnel.yaml              |  32 +++++
 .../engine/core/parse/JobConfigParser.java         |   4 +
 .../engine/server/CoordinatorService.java          |  21 +--
 .../seatunnel/engine/server/NodeExtension.java     |   2 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  27 ++--
 .../server/checkpoint/CheckpointCoordinator.java   |  16 +--
 .../CheckpointCoordinatorConfiguration.java        | 145 ---------------------
 .../server/checkpoint/CheckpointManager.java       |   9 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  34 ++++-
 .../server/service/slot/DefaultSlotService.java    |  23 ++--
 .../server/checkpoint/CheckpointManagerTest.java   |   7 +-
 26 files changed, 506 insertions(+), 282 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index d89b24570..4d8d6446b 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -27,7 +27,7 @@ import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
 public class SeaTunnelClient implements SeaTunnelClientInstance {
-    private SeaTunnelHazelcastClient hazelcastClient;
+    private final SeaTunnelHazelcastClient hazelcastClient;
 
     public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
         this.hazelcastClient = new SeaTunnelHazelcastClient(clientConfig);
@@ -50,7 +50,7 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
     public String printMessageToMaster(@NonNull String msg) {
         return hazelcastClient.requestOnMasterAndDecodeResponse(
             SeaTunnelPrintMessageCodec.encodeRequest(msg),
-            response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
+            SeaTunnelPrintMessageCodec::decodeResponse
         );
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 3700378aa..e893988b0 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -72,11 +72,6 @@ public class SeaTunnelHazelcastClient {
         return requestAndDecodeResponse(masterUuid, request, decoder);
     }
 
-    public <S> S requestOnAnyMemberAndDecodeResponse(@NonNull ClientMessage request,
-                                                     @NonNull Function<ClientMessage, Object> decoder) {
-        return requestAndDecodeResponse(null, request, decoder);
-    }
-
     public <S> S requestAndDecodeResponse(@NonNull UUID uuid,
                                           @NonNull ClientMessage request,
                                           @NonNull Function<ClientMessage, Object> decoder) {
@@ -95,11 +90,11 @@ public class SeaTunnelHazelcastClient {
     public <T> PassiveCompletableFuture<T> requestAndGetCompletableFuture(@NonNull UUID uuid,
                                                                           @NonNull ClientMessage request,
                                                                           @NonNull
-                                                                          ClientMessageDecoder clientMessageDecoder) {
+                                                                          ClientMessageDecoder<?> clientMessageDecoder) {
         ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
         try {
 
-            return new PassiveCompletableFuture<T>(new ClientDelegatingFuture<>(
+            return new PassiveCompletableFuture<>(new ClientDelegatingFuture<>(
                 invocation.invoke(),
                 serializationService,
                 clientMessageDecoder
@@ -111,31 +106,21 @@ public class SeaTunnelHazelcastClient {
 
     public <T> PassiveCompletableFuture<T> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request,
                                                                                   @NonNull
-                                                                                  ClientMessageDecoder clientMessageDecoder) {
+                                                                                  ClientMessageDecoder<?> clientMessageDecoder) {
         UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
         return requestAndGetCompletableFuture(masterUuid, request, clientMessageDecoder);
     }
 
-    public <T> PassiveCompletableFuture<T> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request,
-                                                                                     @NonNull
-                                                                                     ClientMessageDecoder clientMessageDecoder) {
-        return requestAndGetCompletableFuture(null, request, clientMessageDecoder);
-    }
-
     public PassiveCompletableFuture<Void> requestAndGetCompletableFuture(@NonNull UUID uuid,
                                                                          @NonNull ClientMessage request) {
         ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
         try {
-            return new PassiveCompletableFuture(invocation.invoke().thenApply(r -> null));
+            return new PassiveCompletableFuture<>(invocation.invoke().thenApply(r -> null));
         } catch (Throwable t) {
             throw ExceptionUtil.rethrow(t);
         }
     }
 
-    public PassiveCompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
-        return requestAndGetCompletableFuture(null, request);
-    }
-
     public PassiveCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
         UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
         return requestAndGetCompletableFuture(masterUuid, request);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index bbcfea8ad..5566e49e2 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import lombok.NonNull;
 
 public class JobClient {
-    private SeaTunnelHazelcastClient hazelcastClient;
+    private final SeaTunnelHazelcastClient hazelcastClient;
 
     public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
         this.hazelcastClient = hazelcastClient;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index ca0fb1b12..aa3d457e9 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -37,19 +37,19 @@ import java.util.concurrent.ExecutionException;
 
 public class JobExecutionEnvironment {
 
-    private JobConfig jobConfig;
+    private final JobConfig jobConfig;
 
-    private int maxParallelism = 1;
+    private final int maxParallelism = 1;
 
-    private List<Action> actions = new ArrayList<>();
+    private final List<Action> actions = new ArrayList<>();
 
-    private List<URL> jarUrls = new ArrayList<>();
+    private final List<URL> jarUrls = new ArrayList<>();
 
-    private String jobFilePath;
+    private final String jobFilePath;
 
-    private IdGenerator idGenerator;
+    private final IdGenerator idGenerator;
 
-    private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+    private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
 
     private final JobClient jobClient;
 
@@ -81,7 +81,7 @@ public class JobExecutionEnvironment {
 
     public ClientJobProxy execute() throws ExecutionException, InterruptedException {
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
-            Long.valueOf(jobConfig.getJobContext().getJobId()),
+            Long.parseLong(jobConfig.getJobContext().getJobId()),
             seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
             jobConfig,
             jarUrls);
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 88697461e..365e4e887 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -18,19 +18,32 @@
 package org.apache.seatunnel.engine.common.config;
 
 import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
+import static com.hazelcast.internal.util.Preconditions.checkPositive;
+
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
 
 import lombok.Data;
 
 @Data
+@SuppressWarnings("checkstyle:MagicNumber")
 public class EngineConfig {
-    private int backupCount;
+    private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
+    private int printExecutionInfoInterval = ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
+
+    private SlotServiceConfig slotServiceConfig = ServerConfigOptions.SLOT_SERVICE.defaultValue();
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    private int serverExecutorPoolSize = 20;
+    private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue();
 
-    public EngineConfig setBackupCount(int newBackupCount) {
+    public void setBackupCount(int newBackupCount) {
         checkBackupCount(newBackupCount, 0);
         this.backupCount = newBackupCount;
-        return this;
     }
+
+    public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) {
+        checkPositive(printExecutionInfoInterval, ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + " must be > 0");
+        this.printExecutionInfoInterval = printExecutionInfoInterval;
+    }
+
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 2fb28b14d..ced337b9b 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -26,12 +26,16 @@ import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import lombok.Data;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 @Data
 public class JobConfig implements IdentifiedDataSerializable {
     private String name;
     private JobContext jobContext;
 
+    private Map<String, Object> envOptions = new HashMap<>();
+
     @Override
     public int getFactoryId() {
         return ConfigDataSerializerHook.FACTORY_ID;
@@ -46,11 +50,13 @@ public class JobConfig implements IdentifiedDataSerializable {
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeString(name);
         out.writeObject(jobContext);
+        out.writeObject(envOptions);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         this.name = in.readString();
         this.jobContext = in.readObject();
+        this.envOptions = in.readObject();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
index 98f6c4732..cfaa5ae65 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -49,7 +49,7 @@ public class SeaTunnelConfig {
     }
 
     /**
-     * Returns the absolute path for seatunnel.home based from the system property
+     * Returns the absolute path for `seatunnel.home` based from the system property
      * {@link SeaTunnelProperties#SEATUNNEL_HOME}
      */
     private static String seatunnelHome() {
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5531db478..39ef2ce0c 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -19,14 +19,24 @@ package org.apache.seatunnel.engine.common.config;
 
 import static com.hazelcast.internal.config.DomConfigHelper.childElements;
 import static com.hazelcast.internal.config.DomConfigHelper.cleanNodeName;
+import static com.hazelcast.internal.config.DomConfigHelper.getBooleanValue;
 import static com.hazelcast.internal.config.DomConfigHelper.getIntegerValue;
 
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
+
 import com.hazelcast.config.InvalidConfigurationException;
 import com.hazelcast.internal.config.AbstractDomConfigProcessor;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
 import org.w3c.dom.Node;
 
 public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor {
 
+    private static final ILogger LOGGER = Logger.getLogger(YamlSeaTunnelDomConfigProcessor.class);
+
     private final SeaTunnelConfig config;
 
     YamlSeaTunnelDomConfigProcessor(boolean domLevel3, SeaTunnelConfig config) {
@@ -60,20 +70,83 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor
         return false;
     }
 
-    @SuppressWarnings("checkstyle:RegexpSinglelineJava")
+    private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) {
+        SlotServiceConfig slotServiceConfig = new SlotServiceConfig();
+        for (Node node : childElements(slotServiceNode)) {
+            String name = cleanNodeName(node);
+            if (ServerConfigOptions.DYNAMIC_SLOT.key().equals(name)) {
+                slotServiceConfig.setDynamicSlot(getBooleanValue(getTextContent(node)));
+            } else if (ServerConfigOptions.SLOT_NUM.key().equals(name)) {
+                slotServiceConfig.setSlotNum(getIntegerValue(ServerConfigOptions.SLOT_NUM.key(), getTextContent(node)));
+            } else {
+                LOGGER.warning("Unrecognized element: " + name);
+            }
+        }
+        return slotServiceConfig;
+    }
+
     private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
         final EngineConfig engineConfig = config.getEngineConfig();
         for (Node node : childElements(engineNode)) {
             String name = cleanNodeName(node);
-            switch (name) {
-                case "backup-count":
-                    engineConfig.setBackupCount(
-                        getIntegerValue("backup-count", getTextContent(node))
-                    );
-                    break;
-                default:
-                    throw new AssertionError("Unrecognized element: " + name);
+            if (ServerConfigOptions.BACKUP_COUNT.key().equals(name)) {
+                engineConfig.setBackupCount(
+                    getIntegerValue(ServerConfigOptions.BACKUP_COUNT.key(), getTextContent(node))
+                );
+            } else if (ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
+                engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
+                    getTextContent(node)));
+            } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
+                engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
+            } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
+                engineConfig.setCheckpointConfig(parseCheckpointConfig(node));
+            } else {
+                LOGGER.warning("Unrecognized element: " + name);
             }
         }
     }
+
+    private CheckpointConfig parseCheckpointConfig(Node checkpointNode) {
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        for (Node node : childElements(checkpointNode)) {
+            String name = cleanNodeName(node);
+            if (ServerConfigOptions.CHECKPOINT_INTERVAL.key().equals(name)) {
+                checkpointConfig.setCheckpointInterval(
+                    getIntegerValue(ServerConfigOptions.CHECKPOINT_INTERVAL.key(), getTextContent(node))
+                );
+            } else if (ServerConfigOptions.CHECKPOINT_TIMEOUT.key().equals(name)) {
+                checkpointConfig.setCheckpointTimeout(getIntegerValue(ServerConfigOptions.CHECKPOINT_TIMEOUT.key(),
+                    getTextContent(node)));
+            } else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) {
+                checkpointConfig.setMaxConcurrentCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(),
+                    getTextContent(node)));
+            } else if (ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) {
+                checkpointConfig.setTolerableFailureCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(),
+                    getTextContent(node)));
+            } else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
+                checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
+            } else {
+                LOGGER.warning("Unrecognized element: " + name);
+            }
+        }
+
+        return checkpointConfig;
+    }
+
+    private CheckpointStorageConfig parseCheckpointStorageConfig(Node checkpointStorageConfigNode) {
+        CheckpointStorageConfig checkpointStorageConfig = new CheckpointStorageConfig();
+        for (Node node : childElements(checkpointStorageConfigNode)) {
+            String name = cleanNodeName(node);
+            if (ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.key().equals(name)) {
+                checkpointStorageConfig.setStorage(getTextContent(node));
+            } else if (ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key().equals(name)) {
+                checkpointStorageConfig.setMaxRetainedCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key(),
+                    getTextContent(node)));
+            } else {
+                LOGGER.warning("Unrecognized element: " + name);
+            }
+        }
+        return checkpointStorageConfig;
+    }
+
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
new file mode 100644
index 000000000..75917b8b8
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@SuppressWarnings("checkstyle:MagicNumber")
+public class CheckpointConfig implements Serializable {
+
+    public static final long MINIMAL_CHECKPOINT_TIME = 10;
+
+    private long checkpointInterval = ServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue();
+    private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
+    private int maxConcurrentCheckpoints = ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue();
+    private int tolerableFailureCheckpoints = ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue();
+
+    private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
+
+    public void setCheckpointInterval(long checkpointInterval) {
+        checkArgument(checkpointInterval >= MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint interval is 10 mills.");
+        this.checkpointInterval = checkpointInterval;
+    }
+
+    public void setCheckpointTimeout(long checkpointTimeout) {
+        checkArgument(checkpointTimeout >= MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint timeout is 10 mills.");
+        this.checkpointTimeout = checkpointTimeout;
+    }
+
+    public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+        checkArgument(maxConcurrentCheckpoints >= 1, "The minimum number of concurrent checkpoints is 1.");
+        this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+    }
+
+    public void setTolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
+        checkArgument(maxConcurrentCheckpoints >= 0, "The number of tolerance failed checkpoints must be a natural number.");
+        this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
index 88697461e..18f915c88 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
@@ -15,22 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.common.config;
-
-import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
+package org.apache.seatunnel.engine.common.config.server;
 
 import lombok.Data;
 
 @Data
-public class EngineConfig {
-    private int backupCount;
+public class CheckpointStorageConfig {
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    private int serverExecutorPoolSize = 20;
+    private String storage = ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue();
 
-    public EngineConfig setBackupCount(int newBackupCount) {
-        checkBackupCount(newBackupCount, 0);
-        this.backupCount = newBackupCount;
-        return this;
-    }
+    private int maxRetainedCheckpoints = ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.defaultValue();
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
new file mode 100644
index 000000000..cde0901ac
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+@SuppressWarnings("checkstyle:MagicNumber")
+public class ServerConfigOptions {
+
+    public static final Option<Integer> BACKUP_COUNT = Options.key("backup-count").intType().defaultValue(1).withDescription("The number of backup copies of each partition.");
+
+    public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL = Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) between two consecutive executions of the print execution info task.");
+
+    public static final Option<Boolean> DYNAMIC_SLOT = Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether to use dynamic slot.");
+
+    public static final Option<Integer> SLOT_NUM = Options.key("slot-num").intType().defaultValue(2).withDescription("The number of slots. Only valid when dynamic slot is disabled.");
+
+    public static final Option<Integer> CHECKPOINT_INTERVAL = Options.key("interval").intType().defaultValue(300000).withDescription("The interval (in milliseconds) between two consecutive checkpoints.");
+
+    public static final Option<Integer> CHECKPOINT_TIMEOUT = Options.key("timeout").intType().defaultValue(300000).withDescription("The timeout (in milliseconds) for a checkpoint.");
+
+    public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT = Options.key("max-concurrent").intType().defaultValue(1).withDescription("The maximum number of concurrent checkpoints.");
+
+    public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILURE = Options.key("tolerable-failure").intType().defaultValue(0).withDescription("The tolerable failure number of a checkpoint.");
+
+    public static final Option<String> CHECKPOINT_STORAGE_TYPE = Options.key("type").stringType().defaultValue("localfile").withDescription("The checkpoint storage type.");
+
+    public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED = Options.key("max-retained").intType().defaultValue(1).withDescription("The maximum number of retained checkpoints.");
+
+    public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE = Options.key("storage").type(new TypeReference<CheckpointStorageConfig>() {
+    }).defaultValue(new CheckpointStorageConfig()).withDescription("The checkpoint storage configuration.");
+
+    public static final Option<SlotServiceConfig> SLOT_SERVICE = Options.key("slot-service").type(new TypeReference<SlotServiceConfig>() {
+    }).defaultValue(new SlotServiceConfig()).withDescription("The slot service configuration.");
+
+    public static final Option<CheckpointConfig> CHECKPOINT = Options.key("checkpoint").type(new TypeReference<CheckpointConfig>() {
+    }).defaultValue(new CheckpointConfig()).withDescription("The checkpoint configuration.");
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageConfiguration.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
similarity index 62%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageConfiguration.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
index 4f108d608..1160d9d24 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
@@ -15,24 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.checkpoint;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-
-@Getter
-@EqualsAndHashCode
-@Builder(builderClassName = "Builder")
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class CheckpointStorageConfiguration {
-    private final String storage;
-    private final int maxRetainedCheckpoints;
-
-    public static final class Builder {
-        private String storage = "localfile";
-        private int maxRetainedCheckpoints = 1;
+package org.apache.seatunnel.engine.common.config.server;
+
+import static com.hazelcast.internal.util.Preconditions.checkPositive;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SlotServiceConfig implements Serializable {
+
+    private boolean dynamicSlot = ServerConfigOptions.DYNAMIC_SLOT.defaultValue();
+
+    private int slotNum = ServerConfigOptions.SLOT_NUM.defaultValue();
+
+    public void setSlotNum(int slotNum) {
+        checkPositive(slotNum, ServerConfigOptions.SLOT_NUM + " must be > 0");
+        this.slotNum = slotNum;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
new file mode 100644
index 000000000..f4d507e85
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class YamlSeaTunnelConfigParserTest {
+
+    @Test
+    public void testSeaTunnelConfig() {
+        YamlSeaTunnelConfigLocator yamlConfigLocator = new YamlSeaTunnelConfigLocator();
+        SeaTunnelConfig config;
+        if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+            // 2. Try loading YAML config from the working directory or from the classpath
+            config = new YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(null).build();
+        } else {
+            throw new RuntimeException("can't find yaml in resources");
+        }
+        Assertions.assertNotNull(config);
+
+        Assertions.assertEquals(config.getEngineConfig().getBackupCount(), 1);
+
+        Assertions.assertEquals(config.getEngineConfig().getPrintExecutionInfoInterval(), 2);
+
+        Assertions.assertFalse(config.getEngineConfig().getSlotServiceConfig().isDynamicSlot());
+
+        Assertions.assertEquals(config.getEngineConfig().getSlotServiceConfig().getSlotNum(), 5);
+
+        Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointInterval(), 6000);
+
+        Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout(), 7000);
+
+        Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints(), 5);
+
+        Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints(), 2);
+
+        Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getStorage(), "test");
+
+        Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getMaxRetainedCheckpoints(), 3);
+
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast-client.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast-client.yaml
new file mode 100644
index 000000000..9552c382e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast-client.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast-client:
+  cluster-name: seatunnel
+
+  network:
+    cluster-members:
+      - localhost:5801
+      - localhost:5802
+      - localhost:5803
+      - localhost:5804
+      - localhost:5805
+      - localhost:5806
+      - localhost:5807
+      - localhost:5808
+      - localhost:5809
+      - localhost:5810
+      - localhost:5811
+      - localhost:5812
+      - localhost:5813
+      - localhost:5814
+      - localhost:5815
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast.yaml
new file mode 100644
index 000000000..21f4d544d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    join:
+      tcp-ip:
+        enabled: true
+        member-list:
+          - localhost
+    port:
+      auto-increment: true
+      port-count: 100
+      port: 5801
+  map:
+    map-name-template:
+      map-store:
+        enabled: true
+        initial-mode: EAGER
+        class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore
+        properties:
+          path: /tmp/file-store-map
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
new file mode 100644
index 000000000..4a2e85121
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+    engine:
+        backup-count: 1
+        print-execution-info-interval: 2
+        slot-service:
+            dynamic-slot: false
+            slot-num: 5
+        checkpoint:
+            interval: 6000
+            timeout: 7000
+            max-concurrent: 5
+            tolerable-failure: 2
+            storage:
+                type: test
+                max-retained: 3
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index e817ed490..1787eda0a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -117,6 +118,9 @@ public class JobConfigParser {
         } else {
             jobConfig.getJobContext().setJobMode(JobMode.BATCH);
         }
+        if (envConfigs.hasPath("checkpoint.interval")) {
+            jobConfig.getEnvOptions().put(ServerConfigOptions.CHECKPOINT_INTERVAL.key(), envConfigs.getInt("checkpoint.interval"));
+        }
     }
 
     /**
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 78ec4158d..8c2bb030e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -56,13 +57,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class CoordinatorService {
-    private NodeEngineImpl nodeEngine;
+    private final NodeEngineImpl nodeEngine;
     private final ILogger logger;
 
     private volatile ResourceManager resourceManager;
 
     /**
-     * IMap key is jobId and value is a Tuple2
+     * IMap key is jobId and value is a Tuple2.
      * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
      * <p>
      * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
@@ -118,16 +119,19 @@ public class CoordinatorService {
 
     private ScheduledExecutorService masterActiveListener;
 
+    private final EngineConfig engineConfig;
+
     @SuppressWarnings("checkstyle:MagicNumber")
-    public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer) {
+    public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer, EngineConfig engineConfig) {
         this.nodeEngine = nodeEngine;
         this.logger = nodeEngine.getLogger(getClass());
         this.executorService =
             Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                 .setNameFormat("seatunnel-coordinator-service-%d").build());
         this.seaTunnelServer = seaTunnelServer;
+        this.engineConfig = engineConfig;
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
-        masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
+        masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
     }
 
     public JobMaster getJobMaster(Long jobId) {
@@ -184,7 +188,8 @@ public class CoordinatorService {
                 getResourceManager(),
                 runningJobStateIMap,
                 runningJobStateTimestampsIMap,
-                ownedSlotProfilesIMap);
+                ownedSlotProfilesIMap,
+                engineConfig);
 
         try {
             jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
@@ -308,7 +313,7 @@ public class CoordinatorService {
             getResourceManager(),
             runningJobStateIMap,
             runningJobStateTimestampsIMap,
-            ownedSlotProfilesIMap);
+            ownedSlotProfilesIMap, engineConfig);
         executorService.submit(() -> {
             try {
                 runningJobInfoIMap.put(jobId, new RunningJobInfo(System.currentTimeMillis(), jobImmutableInformation));
@@ -330,7 +335,7 @@ public class CoordinatorService {
                 runningJobMasterMap.remove(jobId);
             }
         });
-        return new PassiveCompletableFuture(voidCompletableFuture);
+        return new PassiveCompletableFuture<>(voidCompletableFuture);
     }
 
     private void removeJobIMap(JobMaster jobMaster) {
@@ -412,8 +417,6 @@ public class CoordinatorService {
 
     /**
      * return true if this node is a master node and the coordinator service init finished.
-     *
-     * @return
      */
     public boolean isCoordinatorActive() {
         return isActive;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index 311673271..e857d9bd8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -31,7 +31,7 @@ public class NodeExtension extends DefaultNodeExtension {
 
     public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
         super(node);
-        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node, seaTunnelConfig));
+        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 269fd3ed5..397f60434 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
-import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.services.ManagedService;
 import com.hazelcast.internal.services.MembershipAwareService;
 import com.hazelcast.internal.services.MembershipServiceEvent;
@@ -46,11 +45,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
+
     private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
+
     public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
 
     private NodeEngineImpl nodeEngine;
-    private final ILogger logger;
     private final LiveOperationRegistry liveOperationRegistry;
 
     private volatile SlotService slotService;
@@ -62,11 +62,10 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     private volatile boolean isRunning = true;
 
-    public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
-        this.logger = node.getLogger(getClass());
+    public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
         this.liveOperationRegistry = new LiveOperationRegistry();
         this.seaTunnelConfig = seaTunnelConfig;
-        logger.info("SeaTunnel server start...");
+        LOGGER.info("SeaTunnel server start...");
     }
 
     /**
@@ -76,7 +75,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         if (slotService == null) {
             synchronized (this) {
                 if (slotService == null) {
-                    SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, true, 2);
+                    SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                     service.init();
                     slotService = service;
                 }
@@ -95,9 +94,9 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         );
         taskExecutionService.start();
         getSlotService();
-        coordinatorService = new CoordinatorService(nodeEngine, this);
+        coordinatorService = new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
         monitorService = Executors.newSingleThreadScheduledExecutor();
-        monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
+        monitorService.scheduleAtFixedRate(this::printExecutionInfo, 0, seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(), TimeUnit.SECONDS);
     }
 
     @Override
@@ -134,7 +133,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
                 this.getCoordinatorService().memberRemoved(event);
             }
         } catch (SeaTunnelEngineException e) {
-            logger.severe("Error when handle member removed event", e);
+            LOGGER.severe("Error when handle member removed event", e);
         }
     }
 
@@ -147,7 +146,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
      * Used for debugging on call
      */
     public String printMessage(String message) {
-        this.logger.info(nodeEngine.getThisAddress() + ":" + message);
+        LOGGER.info(nodeEngine.getThisAddress() + ":" + message);
         return message;
     }
 
@@ -162,7 +161,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
             // TODO the retry count and sleep time need configurable
             while (!coordinatorService.isCoordinatorActive() && retryCount < 20 && isRunning) {
                 try {
-                    logger.warning("This is master node, waiting the coordinator service init finished");
+                    LOGGER.warning("This is master node, waiting the coordinator service init finished");
                     Thread.sleep(1000);
                     retryCount++;
                 } catch (InterruptedException e) {
@@ -187,17 +186,13 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
      * return whether task is end
      *
      * @param taskGroupLocation taskGroupLocation
-     * @return
      */
     public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
         IMap<Object, Object> runningJobState =
             nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
-        if (runningJobState == null) {
-            return false;
-        }
 
         Object taskState = runningJobState.get(taskGroupLocation);
-        return taskState == null ? false : ((ExecutionState) taskState).isEndState();
+        return taskState != null && ((ExecutionState) taskState).isEndState();
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index df0e06122..557c365b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
 import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
 import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -82,8 +83,6 @@ public class CheckpointCoordinator {
 
     private final CheckpointStorage checkpointStorage;
 
-    private final CheckpointStorageConfiguration storageConfig;
-
     @Getter
     private final CheckpointIDCounter checkpointIdCounter;
 
@@ -106,7 +105,7 @@ public class CheckpointCoordinator {
 
     private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
 
-    private final CheckpointCoordinatorConfiguration coordinatorConfig;
+    private final CheckpointConfig coordinatorConfig;
 
     private int tolerableFailureCheckpoints;
     private final transient ScheduledExecutorService scheduler;
@@ -123,22 +122,21 @@ public class CheckpointCoordinator {
     @SneakyThrows
     public CheckpointCoordinator(CheckpointManager manager,
                                  CheckpointStorage checkpointStorage,
-                                 CheckpointStorageConfiguration storageConfig,
+                                 CheckpointConfig checkpointConfig,
                                  long jobId,
                                  CheckpointPlan plan,
-                                 CheckpointCoordinatorConfiguration coordinatorConfig,
                                  CheckpointIDCounter checkpointIdCounter,
                                  PipelineState pipelineState) {
+
         this.checkpointManager = manager;
         this.checkpointStorage = checkpointStorage;
-        this.storageConfig = storageConfig;
         this.jobId = jobId;
         this.pipelineId = plan.getPipelineId();
         this.plan = plan;
-        this.coordinatorConfig = coordinatorConfig;
+        this.coordinatorConfig = checkpointConfig;
         this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints();
         this.pendingCheckpoints = new ConcurrentHashMap<>();
-        this.completedCheckpoints = new ArrayDeque<>(storageConfig.getMaxRetainedCheckpoints() + 1);
+        this.completedCheckpoints = new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
         this.scheduler = Executors.newScheduledThreadPool(
             1, runnable -> {
                 Thread thread = new Thread(runnable);
@@ -419,7 +417,7 @@ public class CheckpointCoordinator {
                 .pipelineId(pipelineId)
                 .states(states)
                 .build());
-            if (completedCheckpoints.size() > storageConfig.getMaxRetainedCheckpoints()) {
+            if (completedCheckpoints.size() > coordinatorConfig.getStorage().getMaxRetainedCheckpoints()) {
                 CompletedCheckpoint superfluous = completedCheckpoints.removeFirst();
                 checkpointStorage.deleteCheckpoint(
                     String.valueOf(superfluous.getJobId()),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
deleted file mode 100644
index 257bdbe6a..000000000
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.checkpoint;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-public class CheckpointCoordinatorConfiguration implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    public static final long MINIMAL_CHECKPOINT_TIME = 10;
-
-    private final long checkpointInterval;
-
-    private final long checkpointTimeout;
-
-    private final int maxConcurrentCheckpoints;
-
-    private final int tolerableFailureCheckpoints;
-
-    private CheckpointCoordinatorConfiguration(long checkpointInterval,
-                                               long checkpointTimeout,
-                                               int maxConcurrentCheckpoints,
-                                               int tolerableFailureCheckpoints) {
-        this.checkpointInterval = checkpointInterval;
-        this.checkpointTimeout = checkpointTimeout;
-        this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-        this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
-    }
-
-    public long getCheckpointInterval() {
-        return checkpointInterval;
-    }
-
-    public long getCheckpointTimeout() {
-        return checkpointTimeout;
-    }
-
-    public int getMaxConcurrentCheckpoints() {
-        return maxConcurrentCheckpoints;
-    }
-
-    public int getTolerableFailureCheckpoints() {
-        return tolerableFailureCheckpoints;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        CheckpointCoordinatorConfiguration that = (CheckpointCoordinatorConfiguration) o;
-        return checkpointInterval == that.checkpointInterval
-            && checkpointTimeout == that.checkpointTimeout
-            && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints
-            && tolerableFailureCheckpoints == that.tolerableFailureCheckpoints;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-            checkpointInterval,
-            checkpointTimeout,
-            maxConcurrentCheckpoints,
-            tolerableFailureCheckpoints);
-    }
-
-    @Override
-    public String toString() {
-        return "CheckpointCoordinatorConfiguration{" +
-            "checkpointInterval=" + checkpointInterval +
-            ", checkpointTimeout=" + checkpointTimeout +
-            ", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
-            ", tolerableFailureCheckpoints=" + tolerableFailureCheckpoints +
-            '}';
-    }
-
-    public static CheckpointCoordinatorConfiguration.Builder builder() {
-        return new Builder();
-    }
-
-    @SuppressWarnings("MagicNumber")
-    public static final class Builder {
-        // TODO 5000 is for test, we can update checkpointInterval to 300000 after we support it read from job config
-        private long checkpointInterval = 5000;
-        private long checkpointTimeout = 300000;
-        private int maxConcurrentCheckpoints = 1;
-        private int tolerableFailureCheckpoints = 0;
-
-        private Builder() {
-        }
-
-        public Builder checkpointInterval(long checkpointInterval) {
-            checkArgument(checkpointInterval < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint interval is 10 mills.");
-            this.checkpointInterval = checkpointInterval;
-            return this;
-        }
-
-        public Builder checkpointTimeout(long checkpointTimeout) {
-            checkArgument(checkpointTimeout < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint timeout is 10 mills.");
-            this.checkpointTimeout = checkpointTimeout;
-            return this;
-        }
-
-        public Builder maxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
-            checkArgument(maxConcurrentCheckpoints < 1, "The minimum number of concurrent checkpoints is 1.");
-            this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-            return this;
-        }
-
-        public Builder tolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
-            checkArgument(maxConcurrentCheckpoints < 0, "The number of tolerance failed checkpoints must be a natural number.");
-            this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
-            return this;
-        }
-
-        public CheckpointCoordinatorConfiguration build() {
-            return new CheckpointCoordinatorConfiguration(
-                checkpointInterval,
-                checkpointTimeout,
-                maxConcurrentCheckpoints,
-                tolerableFailureCheckpoints);
-        }
-    }
-}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 5286c1aae..fd48f7e65 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
 import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -79,12 +80,11 @@ public class CheckpointManager {
     public CheckpointManager(long jobId,
                              NodeEngine nodeEngine,
                              Map<Integer, CheckpointPlan> checkpointPlanMap,
-                             CheckpointCoordinatorConfiguration coordinatorConfig,
-                             CheckpointStorageConfiguration storageConfig) throws CheckpointStorageException {
+                             CheckpointConfig checkpointConfig) throws CheckpointStorageException {
         this.jobId = jobId;
         this.nodeEngine = nodeEngine;
         this.subtaskWithAddresses = new HashMap<>();
-        this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, storageConfig.getStorage())
+        this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
             .create(new HashMap<>());
         IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
         this.coordinatorMap = checkpointPlanMap.values().parallelStream()
@@ -99,10 +99,9 @@ public class CheckpointManager {
                     }
                     return new CheckpointCoordinator(this,
                         checkpointStorage,
-                        storageConfig,
+                        checkpointConfig,
                         jobId,
                         plan,
-                        coordinatorConfig,
                         idCounter,
                         pipelineState);
                 } catch (Exception e) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index b17bdc705..2df8df9c4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -21,15 +21,17 @@ import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorConfiguration;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfiguration;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
@@ -94,13 +96,15 @@ public class JobMaster extends Thread {
 
     private volatile boolean restore = false;
 
+    private final EngineConfig engineConfig;
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
                      @NonNull ResourceManager resourceManager,
                      @NonNull IMap runningJobStateIMap,
                      @NonNull IMap runningJobStateTimestampsIMap,
-                     @NonNull IMap ownedSlotProfilesIMap) {
+                     @NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) {
         this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
@@ -110,6 +114,7 @@ public class JobMaster extends Thread {
         this.resourceManager = resourceManager;
         this.runningJobStateIMap = runningJobStateIMap;
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
+        this.engineConfig = engineConfig;
     }
 
     public void init(long initializationTimestamp) throws Exception {
@@ -129,6 +134,9 @@ public class JobMaster extends Thread {
         } else {
             logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
         }
+
+        CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig().getEnvOptions());
+
         final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
             nodeEngine,
             jobImmutableInformation,
@@ -144,9 +152,23 @@ public class JobMaster extends Thread {
             jobImmutableInformation.getJobId(),
             nodeEngine,
             planTuple.f1(),
-            // TODO: checkpoint config
-            CheckpointCoordinatorConfiguration.builder().build(),
-            CheckpointStorageConfiguration.builder().build());
+            checkpointConfig);
+    }
+
+    // TODO replace it after ReadableConfig Support parse yaml format, then use only one config to read engine and env config.
+    private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine, Map<String, Object> env) {
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        if (env.containsKey(ServerConfigOptions.CHECKPOINT_INTERVAL.key())) {
+            checkpointConfig.setCheckpointInterval((Integer) env.get(ServerConfigOptions.CHECKPOINT_INTERVAL.key()));
+        }
+        checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
+        checkpointConfig.setTolerableFailureCheckpoints(engine.getTolerableFailureCheckpoints());
+        checkpointConfig.setMaxConcurrentCheckpoints(engine.getMaxConcurrentCheckpoints());
+        CheckpointStorageConfig storageConfig = new CheckpointStorageConfig();
+        storageConfig.setMaxRetainedCheckpoints(engine.getStorage().getMaxRetainedCheckpoints());
+        storageConfig.setStorage(engine.getStorage().getStorage());
+        checkpointConfig.setStorage(storageConfig);
+        return checkpointConfig;
     }
 
     public void initStateFuture() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index fc9a06d30..e85cff0b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.service.slot;
 
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
@@ -60,19 +61,17 @@ public class DefaultSlotService implements SlotService {
 
     private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
     private ScheduledExecutorService scheduledExecutorService;
-    private final boolean dynamicSlot;
-    private final int slotNumber;
+    private final SlotServiceConfig config;
     private volatile boolean initStatus;
     private final IdGenerator idGenerator;
     private final TaskExecutionService taskExecutionService;
     private ConcurrentMap<Integer, SlotContext> contexts;
 
-    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot,
-                              int slotNumber) {
+    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService,
+                              SlotServiceConfig config) {
         this.nodeEngine = nodeEngine;
-        this.dynamicSlot = dynamicSlot;
+        this.config = config;
         this.taskExecutionService = taskExecutionService;
-        this.slotNumber = slotNumber;
         this.idGenerator = new IdGenerator();
     }
 
@@ -86,7 +85,7 @@ public class DefaultSlotService implements SlotService {
         assignedResource = new AtomicReference<>(new ResourceProfile());
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
             String.format("hz.%s.seaTunnel.slotService.thread", nodeEngine.getHazelcastInstance().getName())));
-        if (!dynamicSlot) {
+        if (!config.isDynamicSlot()) {
             initFixedSlots();
         }
         unassignedResource.set(getNodeResource());
@@ -154,7 +153,7 @@ public class DefaultSlotService implements SlotService {
         assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
         unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
         profile.unassigned();
-        if (!dynamicSlot) {
+        if (!config.isDynamicSlot()) {
             unassignedSlots.put(profile.getSlotID(), profile);
         }
         assignedSlots.remove(profile.getSlotID());
@@ -169,10 +168,10 @@ public class DefaultSlotService implements SlotService {
     }
 
     private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
-        if (unassignedSlots.isEmpty() && !dynamicSlot) {
+        if (unassignedSlots.isEmpty() && !config.isDynamicSlot()) {
             return null;
         }
-        if (dynamicSlot) {
+        if (config.isDynamicSlot()) {
             if (unassignedResource.get().enoughThan(profile)) {
                 return new SlotProfile(nodeEngine.getThisAddress(), (int) idGenerator.getNextId(), profile);
             }
@@ -196,9 +195,9 @@ public class DefaultSlotService implements SlotService {
 
     private void initFixedSlots() {
         long maxMemory = Runtime.getRuntime().maxMemory();
-        for (int i = 0; i < slotNumber; i++) {
+        for (int i = 0; i < config.getSlotNum(); i++) {
             unassignedSlots.put(i, new SlotProfile(nodeEngine.getThisAddress(), i,
-                new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
+                    new ResourceProfile(CPU.of(0), Memory.of(maxMemory / config.getSlotNum()))));
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 1801def2b..0ee8bbd80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
 import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
 import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
@@ -45,7 +47,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
     public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException {
         long jobId = (long) (Math.random() * 1000000L);
         CheckpointStorage checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class,
-                CheckpointStorageConfiguration.builder().build().getStorage())
+                new CheckpointStorageConfig().getStorage())
             .create(new HashMap<>());
         CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobId, 1, 1,
             Instant.now().toEpochMilli(),
@@ -63,8 +65,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
             jobId,
             nodeEngine,
             planMap,
-            CheckpointCoordinatorConfiguration.builder().build(),
-            CheckpointStorageConfiguration.builder().build());
+            new CheckpointConfig());
         Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
         CompletableFuture<Void> future = checkpointManager.listenPipeline(1, org.apache.seatunnel.engine.core.job.PipelineState.FINISHED);
         future.join();