You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/17 12:24:19 UTC
[incubator-seatunnel] branch dev updated: [Engine] [ConfigFile] Add Config for Slot Service and Checkpoint (#3061)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 07c2a11f1 [Engine] [ConfigFile] Add Config for Slot Service and Checkpoint (#3061)
07c2a11f1 is described below
commit 07c2a11f1eccd09aabd39529f138be14c55e7a42
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Oct 17 20:24:13 2022 +0800
[Engine] [ConfigFile] Add Config for Slot Service and Checkpoint (#3061)
---
.../seatunnel/engine/client/SeaTunnelClient.java | 4 +-
.../engine/client/SeaTunnelHazelcastClient.java | 23 +---
.../seatunnel/engine/client/job/JobClient.java | 2 +-
.../engine/client/job/JobExecutionEnvironment.java | 16 +--
.../engine/common/config/EngineConfig.java | 23 +++-
.../seatunnel/engine/common/config/JobConfig.java | 6 +
.../engine/common/config/SeaTunnelConfig.java | 2 +-
.../config/YamlSeaTunnelDomConfigProcessor.java | 91 +++++++++++--
.../common/config/server/CheckpointConfig.java | 59 +++++++++
.../CheckpointStorageConfig.java} | 16 +--
.../common/config/server/ServerConfigOptions.java | 56 ++++++++
.../common/config/server/SlotServiceConfig.java} | 37 +++---
.../config/YamlSeaTunnelConfigParserTest.java | 59 +++++++++
.../src/test/resources/hazelcast-client.yaml | 37 ++++++
.../src/test/resources/hazelcast.yaml | 37 ++++++
.../src/test/resources/seatunnel.yaml | 32 +++++
.../engine/core/parse/JobConfigParser.java | 4 +
.../engine/server/CoordinatorService.java | 21 +--
.../seatunnel/engine/server/NodeExtension.java | 2 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 27 ++--
.../server/checkpoint/CheckpointCoordinator.java | 16 +--
.../CheckpointCoordinatorConfiguration.java | 145 ---------------------
.../server/checkpoint/CheckpointManager.java | 9 +-
.../seatunnel/engine/server/master/JobMaster.java | 34 ++++-
.../server/service/slot/DefaultSlotService.java | 23 ++--
.../server/checkpoint/CheckpointManagerTest.java | 7 +-
26 files changed, 506 insertions(+), 282 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index d89b24570..4d8d6446b 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -27,7 +27,7 @@ import com.hazelcast.logging.ILogger;
import lombok.NonNull;
public class SeaTunnelClient implements SeaTunnelClientInstance {
- private SeaTunnelHazelcastClient hazelcastClient;
+ private final SeaTunnelHazelcastClient hazelcastClient;
public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
this.hazelcastClient = new SeaTunnelHazelcastClient(clientConfig);
@@ -50,7 +50,7 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
public String printMessageToMaster(@NonNull String msg) {
return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelPrintMessageCodec.encodeRequest(msg),
- response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
+ SeaTunnelPrintMessageCodec::decodeResponse
);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 3700378aa..e893988b0 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -72,11 +72,6 @@ public class SeaTunnelHazelcastClient {
return requestAndDecodeResponse(masterUuid, request, decoder);
}
- public <S> S requestOnAnyMemberAndDecodeResponse(@NonNull ClientMessage request,
- @NonNull Function<ClientMessage, Object> decoder) {
- return requestAndDecodeResponse(null, request, decoder);
- }
-
public <S> S requestAndDecodeResponse(@NonNull UUID uuid,
@NonNull ClientMessage request,
@NonNull Function<ClientMessage, Object> decoder) {
@@ -95,11 +90,11 @@ public class SeaTunnelHazelcastClient {
public <T> PassiveCompletableFuture<T> requestAndGetCompletableFuture(@NonNull UUID uuid,
@NonNull ClientMessage request,
@NonNull
- ClientMessageDecoder clientMessageDecoder) {
+ ClientMessageDecoder<?> clientMessageDecoder) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
- return new PassiveCompletableFuture<T>(new ClientDelegatingFuture<>(
+ return new PassiveCompletableFuture<>(new ClientDelegatingFuture<>(
invocation.invoke(),
serializationService,
clientMessageDecoder
@@ -111,31 +106,21 @@ public class SeaTunnelHazelcastClient {
public <T> PassiveCompletableFuture<T> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request,
@NonNull
- ClientMessageDecoder clientMessageDecoder) {
+ ClientMessageDecoder<?> clientMessageDecoder) {
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
return requestAndGetCompletableFuture(masterUuid, request, clientMessageDecoder);
}
- public <T> PassiveCompletableFuture<T> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request,
- @NonNull
- ClientMessageDecoder clientMessageDecoder) {
- return requestAndGetCompletableFuture(null, request, clientMessageDecoder);
- }
-
public PassiveCompletableFuture<Void> requestAndGetCompletableFuture(@NonNull UUID uuid,
@NonNull ClientMessage request) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
- return new PassiveCompletableFuture(invocation.invoke().thenApply(r -> null));
+ return new PassiveCompletableFuture<>(invocation.invoke().thenApply(r -> null));
} catch (Throwable t) {
throw ExceptionUtil.rethrow(t);
}
}
- public PassiveCompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
- return requestAndGetCompletableFuture(null, request);
- }
-
public PassiveCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
return requestAndGetCompletableFuture(masterUuid, request);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index bbcfea8ad..5566e49e2 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import lombok.NonNull;
public class JobClient {
- private SeaTunnelHazelcastClient hazelcastClient;
+ private final SeaTunnelHazelcastClient hazelcastClient;
public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
this.hazelcastClient = hazelcastClient;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index ca0fb1b12..aa3d457e9 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -37,19 +37,19 @@ import java.util.concurrent.ExecutionException;
public class JobExecutionEnvironment {
- private JobConfig jobConfig;
+ private final JobConfig jobConfig;
- private int maxParallelism = 1;
+ private final int maxParallelism = 1;
- private List<Action> actions = new ArrayList<>();
+ private final List<Action> actions = new ArrayList<>();
- private List<URL> jarUrls = new ArrayList<>();
+ private final List<URL> jarUrls = new ArrayList<>();
- private String jobFilePath;
+ private final String jobFilePath;
- private IdGenerator idGenerator;
+ private final IdGenerator idGenerator;
- private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+ private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private final JobClient jobClient;
@@ -81,7 +81,7 @@ public class JobExecutionEnvironment {
public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
- Long.valueOf(jobConfig.getJobContext().getJobId()),
+ Long.parseLong(jobConfig.getJobContext().getJobId()),
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
jobConfig,
jarUrls);
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 88697461e..365e4e887 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -18,19 +18,32 @@
package org.apache.seatunnel.engine.common.config;
import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
+import static com.hazelcast.internal.util.Preconditions.checkPositive;
+
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import lombok.Data;
@Data
+@SuppressWarnings("checkstyle:MagicNumber")
public class EngineConfig {
- private int backupCount;
+ private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
+ private int printExecutionInfoInterval = ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
+
+ private SlotServiceConfig slotServiceConfig = ServerConfigOptions.SLOT_SERVICE.defaultValue();
- @SuppressWarnings("checkstyle:MagicNumber")
- private int serverExecutorPoolSize = 20;
+ private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue();
- public EngineConfig setBackupCount(int newBackupCount) {
+ public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
this.backupCount = newBackupCount;
- return this;
}
+
+ public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) {
+ checkPositive(printExecutionInfoInterval, ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + " must be > 0");
+ this.printExecutionInfoInterval = printExecutionInfoInterval;
+ }
+
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 2fb28b14d..ced337b9b 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -26,12 +26,16 @@ import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import lombok.Data;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
@Data
public class JobConfig implements IdentifiedDataSerializable {
private String name;
private JobContext jobContext;
+ private Map<String, Object> envOptions = new HashMap<>();
+
@Override
public int getFactoryId() {
return ConfigDataSerializerHook.FACTORY_ID;
@@ -46,11 +50,13 @@ public class JobConfig implements IdentifiedDataSerializable {
public void writeData(ObjectDataOutput out) throws IOException {
out.writeString(name);
out.writeObject(jobContext);
+ out.writeObject(envOptions);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
this.name = in.readString();
this.jobContext = in.readObject();
+ this.envOptions = in.readObject();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
index 98f6c4732..cfaa5ae65 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -49,7 +49,7 @@ public class SeaTunnelConfig {
}
/**
- * Returns the absolute path for seatunnel.home based from the system property
+ * Returns the absolute path for `seatunnel.home` based from the system property
* {@link SeaTunnelProperties#SEATUNNEL_HOME}
*/
private static String seatunnelHome() {
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5531db478..39ef2ce0c 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -19,14 +19,24 @@ package org.apache.seatunnel.engine.common.config;
import static com.hazelcast.internal.config.DomConfigHelper.childElements;
import static com.hazelcast.internal.config.DomConfigHelper.cleanNodeName;
+import static com.hazelcast.internal.config.DomConfigHelper.getBooleanValue;
import static com.hazelcast.internal.config.DomConfigHelper.getIntegerValue;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
+
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.internal.config.AbstractDomConfigProcessor;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
import org.w3c.dom.Node;
public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor {
+ private static final ILogger LOGGER = Logger.getLogger(YamlSeaTunnelDomConfigProcessor.class);
+
private final SeaTunnelConfig config;
YamlSeaTunnelDomConfigProcessor(boolean domLevel3, SeaTunnelConfig config) {
@@ -60,20 +70,83 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor
return false;
}
- @SuppressWarnings("checkstyle:RegexpSinglelineJava")
+ private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) {
+ SlotServiceConfig slotServiceConfig = new SlotServiceConfig();
+ for (Node node : childElements(slotServiceNode)) {
+ String name = cleanNodeName(node);
+ if (ServerConfigOptions.DYNAMIC_SLOT.key().equals(name)) {
+ slotServiceConfig.setDynamicSlot(getBooleanValue(getTextContent(node)));
+ } else if (ServerConfigOptions.SLOT_NUM.key().equals(name)) {
+ slotServiceConfig.setSlotNum(getIntegerValue(ServerConfigOptions.SLOT_NUM.key(), getTextContent(node)));
+ } else {
+ LOGGER.warning("Unrecognized element: " + name);
+ }
+ }
+ return slotServiceConfig;
+ }
+
private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
final EngineConfig engineConfig = config.getEngineConfig();
for (Node node : childElements(engineNode)) {
String name = cleanNodeName(node);
- switch (name) {
- case "backup-count":
- engineConfig.setBackupCount(
- getIntegerValue("backup-count", getTextContent(node))
- );
- break;
- default:
- throw new AssertionError("Unrecognized element: " + name);
+ if (ServerConfigOptions.BACKUP_COUNT.key().equals(name)) {
+ engineConfig.setBackupCount(
+ getIntegerValue(ServerConfigOptions.BACKUP_COUNT.key(), getTextContent(node))
+ );
+ } else if (ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
+ engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
+ getTextContent(node)));
+ } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
+ engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
+ } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
+ engineConfig.setCheckpointConfig(parseCheckpointConfig(node));
+ } else {
+ LOGGER.warning("Unrecognized element: " + name);
}
}
}
+
+ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) {
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ for (Node node : childElements(checkpointNode)) {
+ String name = cleanNodeName(node);
+ if (ServerConfigOptions.CHECKPOINT_INTERVAL.key().equals(name)) {
+ checkpointConfig.setCheckpointInterval(
+ getIntegerValue(ServerConfigOptions.CHECKPOINT_INTERVAL.key(), getTextContent(node))
+ );
+ } else if (ServerConfigOptions.CHECKPOINT_TIMEOUT.key().equals(name)) {
+ checkpointConfig.setCheckpointTimeout(getIntegerValue(ServerConfigOptions.CHECKPOINT_TIMEOUT.key(),
+ getTextContent(node)));
+ } else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) {
+ checkpointConfig.setMaxConcurrentCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(),
+ getTextContent(node)));
+ } else if (ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) {
+ checkpointConfig.setTolerableFailureCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(),
+ getTextContent(node)));
+ } else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
+ checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
+ } else {
+ LOGGER.warning("Unrecognized element: " + name);
+ }
+ }
+
+ return checkpointConfig;
+ }
+
+ private CheckpointStorageConfig parseCheckpointStorageConfig(Node checkpointStorageConfigNode) {
+ CheckpointStorageConfig checkpointStorageConfig = new CheckpointStorageConfig();
+ for (Node node : childElements(checkpointStorageConfigNode)) {
+ String name = cleanNodeName(node);
+ if (ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.key().equals(name)) {
+ checkpointStorageConfig.setStorage(getTextContent(node));
+ } else if (ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key().equals(name)) {
+ checkpointStorageConfig.setMaxRetainedCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key(),
+ getTextContent(node)));
+ } else {
+ LOGGER.warning("Unrecognized element: " + name);
+ }
+ }
+ return checkpointStorageConfig;
+ }
+
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
new file mode 100644
index 000000000..75917b8b8
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@SuppressWarnings("checkstyle:MagicNumber")
+public class CheckpointConfig implements Serializable {
+
+ public static final long MINIMAL_CHECKPOINT_TIME = 10;
+
+ private long checkpointInterval = ServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue();
+ private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
+ private int maxConcurrentCheckpoints = ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue();
+ private int tolerableFailureCheckpoints = ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue();
+
+ private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
+
+ public void setCheckpointInterval(long checkpointInterval) {
+ checkArgument(checkpointInterval >= MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint interval is 10 mills.");
+ this.checkpointInterval = checkpointInterval;
+ }
+
+ public void setCheckpointTimeout(long checkpointTimeout) {
+ checkArgument(checkpointTimeout >= MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint timeout is 10 mills.");
+ this.checkpointTimeout = checkpointTimeout;
+ }
+
+ public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+ checkArgument(maxConcurrentCheckpoints >= 1, "The minimum number of concurrent checkpoints is 1.");
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ }
+
+ public void setTolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
+ checkArgument(maxConcurrentCheckpoints >= 0, "The number of tolerance failed checkpoints must be a natural number.");
+ this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
index 88697461e..18f915c88 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
@@ -15,22 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.common.config;
-
-import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
+package org.apache.seatunnel.engine.common.config.server;
import lombok.Data;
@Data
-public class EngineConfig {
- private int backupCount;
+public class CheckpointStorageConfig {
- @SuppressWarnings("checkstyle:MagicNumber")
- private int serverExecutorPoolSize = 20;
+ private String storage = ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue();
- public EngineConfig setBackupCount(int newBackupCount) {
- checkBackupCount(newBackupCount, 0);
- this.backupCount = newBackupCount;
- return this;
- }
+ private int maxRetainedCheckpoints = ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.defaultValue();
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
new file mode 100644
index 000000000..cde0901ac
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+@SuppressWarnings("checkstyle:MagicNumber")
+public class ServerConfigOptions {
+
+ public static final Option<Integer> BACKUP_COUNT = Options.key("backup-count").intType().defaultValue(1).withDescription("The number of backup copies of each partition.");
+
+ public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL = Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) between two consecutive executions of the print execution info task.");
+
+ public static final Option<Boolean> DYNAMIC_SLOT = Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether to use dynamic slot.");
+
+ public static final Option<Integer> SLOT_NUM = Options.key("slot-num").intType().defaultValue(2).withDescription("The number of slots. Only valid when dynamic slot is disabled.");
+
+ public static final Option<Integer> CHECKPOINT_INTERVAL = Options.key("interval").intType().defaultValue(300000).withDescription("The interval (in milliseconds) between two consecutive checkpoints.");
+
+ public static final Option<Integer> CHECKPOINT_TIMEOUT = Options.key("timeout").intType().defaultValue(300000).withDescription("The timeout (in milliseconds) for a checkpoint.");
+
+ public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT = Options.key("max-concurrent").intType().defaultValue(1).withDescription("The maximum number of concurrent checkpoints.");
+
+ public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILURE = Options.key("tolerable-failure").intType().defaultValue(0).withDescription("The tolerable failure number of a checkpoint.");
+
+ public static final Option<String> CHECKPOINT_STORAGE_TYPE = Options.key("type").stringType().defaultValue("localfile").withDescription("The checkpoint storage type.");
+
+ public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED = Options.key("max-retained").intType().defaultValue(1).withDescription("The maximum number of retained checkpoints.");
+
+ public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE = Options.key("storage").type(new TypeReference<CheckpointStorageConfig>() {
+ }).defaultValue(new CheckpointStorageConfig()).withDescription("The checkpoint storage configuration.");
+
+ public static final Option<SlotServiceConfig> SLOT_SERVICE = Options.key("slot-service").type(new TypeReference<SlotServiceConfig>() {
+ }).defaultValue(new SlotServiceConfig()).withDescription("The slot service configuration.");
+
+ public static final Option<CheckpointConfig> CHECKPOINT = Options.key("checkpoint").type(new TypeReference<CheckpointConfig>() {
+ }).defaultValue(new CheckpointConfig()).withDescription("The checkpoint configuration.");
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageConfiguration.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
similarity index 62%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageConfiguration.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
index 4f108d608..1160d9d24 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
@@ -15,24 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.checkpoint;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-
-@Getter
-@EqualsAndHashCode
-@Builder(builderClassName = "Builder")
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class CheckpointStorageConfiguration {
- private final String storage;
- private final int maxRetainedCheckpoints;
-
- public static final class Builder {
- private String storage = "localfile";
- private int maxRetainedCheckpoints = 1;
+package org.apache.seatunnel.engine.common.config.server;
+
+import static com.hazelcast.internal.util.Preconditions.checkPositive;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SlotServiceConfig implements Serializable {
+
+ private boolean dynamicSlot = ServerConfigOptions.DYNAMIC_SLOT.defaultValue();
+
+ private int slotNum = ServerConfigOptions.SLOT_NUM.defaultValue();
+
+ public void setSlotNum(int slotNum) {
+ checkPositive(slotNum, ServerConfigOptions.SLOT_NUM + " must be > 0");
+ this.slotNum = slotNum;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
new file mode 100644
index 000000000..f4d507e85
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class YamlSeaTunnelConfigParserTest {
+
+ @Test
+ public void testSeaTunnelConfig() {
+ YamlSeaTunnelConfigLocator yamlConfigLocator = new YamlSeaTunnelConfigLocator();
+ SeaTunnelConfig config;
+ if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+ // 2. Try loading YAML config from the working directory or from the classpath
+ config = new YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(null).build();
+ } else {
+ throw new RuntimeException("can't find yaml in resources");
+ }
+ Assertions.assertNotNull(config);
+
+ Assertions.assertEquals(config.getEngineConfig().getBackupCount(), 1);
+
+ Assertions.assertEquals(config.getEngineConfig().getPrintExecutionInfoInterval(), 2);
+
+ Assertions.assertFalse(config.getEngineConfig().getSlotServiceConfig().isDynamicSlot());
+
+ Assertions.assertEquals(config.getEngineConfig().getSlotServiceConfig().getSlotNum(), 5);
+
+ Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointInterval(), 6000);
+
+ Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout(), 7000);
+
+ Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints(), 5);
+
+ Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints(), 2);
+
+ Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getStorage(), "test");
+
+ Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getMaxRetainedCheckpoints(), 3);
+
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast-client.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast-client.yaml
new file mode 100644
index 000000000..9552c382e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast-client.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast-client:
+ cluster-name: seatunnel
+
+ network:
+ cluster-members:
+ - localhost:5801
+ - localhost:5802
+ - localhost:5803
+ - localhost:5804
+ - localhost:5805
+ - localhost:5806
+ - localhost:5807
+ - localhost:5808
+ - localhost:5809
+ - localhost:5810
+ - localhost:5811
+ - localhost:5812
+ - localhost:5813
+ - localhost:5814
+ - localhost:5815
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast.yaml
new file mode 100644
index 000000000..21f4d544d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/hazelcast.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - localhost
+ port:
+ auto-increment: true
+ port-count: 100
+ port: 5801
+ map:
+ map-name-template:
+ map-store:
+ enabled: true
+ initial-mode: EAGER
+ class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore
+ properties:
+ path: /tmp/file-store-map
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
new file mode 100644
index 000000000..4a2e85121
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ backup-count: 1
+ print-execution-info-interval: 2
+ slot-service:
+ dynamic-slot: false
+ slot-num: 5
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ max-concurrent: 5
+ tolerable-failure: 2
+ storage:
+ type: test
+ max-retained: 3
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index e817ed490..1787eda0a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -117,6 +118,9 @@ public class JobConfigParser {
} else {
jobConfig.getJobContext().setJobMode(JobMode.BATCH);
}
+ if (envConfigs.hasPath("checkpoint.interval")) {
+ jobConfig.getEnvOptions().put(ServerConfigOptions.CHECKPOINT_INTERVAL.key(), envConfigs.getInt("checkpoint.interval"));
+ }
}
/**
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 78ec4158d..8c2bb030e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -56,13 +57,13 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class CoordinatorService {
- private NodeEngineImpl nodeEngine;
+ private final NodeEngineImpl nodeEngine;
private final ILogger logger;
private volatile ResourceManager resourceManager;
/**
- * IMap key is jobId and value is a Tuple2
+ * IMap key is jobId and value is a Tuple2.
* Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
* <p>
* This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
@@ -118,16 +119,19 @@ public class CoordinatorService {
private ScheduledExecutorService masterActiveListener;
+ private final EngineConfig engineConfig;
+
@SuppressWarnings("checkstyle:MagicNumber")
- public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer) {
+ public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer, EngineConfig engineConfig) {
this.nodeEngine = nodeEngine;
this.logger = nodeEngine.getLogger(getClass());
this.executorService =
Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat("seatunnel-coordinator-service-%d").build());
this.seaTunnelServer = seaTunnelServer;
+ this.engineConfig = engineConfig;
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
- masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
+ masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
}
public JobMaster getJobMaster(Long jobId) {
@@ -184,7 +188,8 @@ public class CoordinatorService {
getResourceManager(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
- ownedSlotProfilesIMap);
+ ownedSlotProfilesIMap,
+ engineConfig);
try {
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
@@ -308,7 +313,7 @@ public class CoordinatorService {
getResourceManager(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
- ownedSlotProfilesIMap);
+ ownedSlotProfilesIMap, engineConfig);
executorService.submit(() -> {
try {
runningJobInfoIMap.put(jobId, new RunningJobInfo(System.currentTimeMillis(), jobImmutableInformation));
@@ -330,7 +335,7 @@ public class CoordinatorService {
runningJobMasterMap.remove(jobId);
}
});
- return new PassiveCompletableFuture(voidCompletableFuture);
+ return new PassiveCompletableFuture<>(voidCompletableFuture);
}
private void removeJobIMap(JobMaster jobMaster) {
@@ -412,8 +417,6 @@ public class CoordinatorService {
/**
* return true if this node is a master node and the coordinator service init finished.
- *
- * @return
*/
public boolean isCoordinatorActive() {
return isActive;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index 311673271..e857d9bd8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -31,7 +31,7 @@ public class NodeExtension extends DefaultNodeExtension {
public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
super(node);
- extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node, seaTunnelConfig));
+ extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 269fd3ed5..397f60434 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
-import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
@@ -46,11 +45,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
+
private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
+
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
private NodeEngineImpl nodeEngine;
- private final ILogger logger;
private final LiveOperationRegistry liveOperationRegistry;
private volatile SlotService slotService;
@@ -62,11 +62,10 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private volatile boolean isRunning = true;
- public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
- this.logger = node.getLogger(getClass());
+ public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
- logger.info("SeaTunnel server start...");
+ LOGGER.info("SeaTunnel server start...");
}
/**
@@ -76,7 +75,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
if (slotService == null) {
synchronized (this) {
if (slotService == null) {
- SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, true, 2);
+ SlotService service = new DefaultSlotService(nodeEngine, taskExecutionService, seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
service.init();
slotService = service;
}
@@ -95,9 +94,9 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
);
taskExecutionService.start();
getSlotService();
- coordinatorService = new CoordinatorService(nodeEngine, this);
+ coordinatorService = new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
monitorService = Executors.newSingleThreadScheduledExecutor();
- monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
+ monitorService.scheduleAtFixedRate(this::printExecutionInfo, 0, seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(), TimeUnit.SECONDS);
}
@Override
@@ -134,7 +133,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
this.getCoordinatorService().memberRemoved(event);
}
} catch (SeaTunnelEngineException e) {
- logger.severe("Error when handle member removed event", e);
+ LOGGER.severe("Error when handle member removed event", e);
}
}
@@ -147,7 +146,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
* Used for debugging on call
*/
public String printMessage(String message) {
- this.logger.info(nodeEngine.getThisAddress() + ":" + message);
+ LOGGER.info(nodeEngine.getThisAddress() + ":" + message);
return message;
}
@@ -162,7 +161,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
// TODO the retry count and sleep time need configurable
while (!coordinatorService.isCoordinatorActive() && retryCount < 20 && isRunning) {
try {
- logger.warning("This is master node, waiting the coordinator service init finished");
+ LOGGER.warning("This is master node, waiting the coordinator service init finished");
Thread.sleep(1000);
retryCount++;
} catch (InterruptedException e) {
@@ -187,17 +186,13 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
* return whether task is end
*
* @param taskGroupLocation taskGroupLocation
- * @return
*/
public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
IMap<Object, Object> runningJobState =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
- if (runningJobState == null) {
- return false;
- }
Object taskState = runningJobState.get(taskGroupLocation);
- return taskState == null ? false : ((ExecutionState) taskState).isEndState();
+ return taskState != null && ((ExecutionState) taskState).isEndState();
}
@SuppressWarnings("checkstyle:MagicNumber")
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index df0e06122..557c365b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -82,8 +83,6 @@ public class CheckpointCoordinator {
private final CheckpointStorage checkpointStorage;
- private final CheckpointStorageConfiguration storageConfig;
-
@Getter
private final CheckpointIDCounter checkpointIdCounter;
@@ -106,7 +105,7 @@ public class CheckpointCoordinator {
private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
- private final CheckpointCoordinatorConfiguration coordinatorConfig;
+ private final CheckpointConfig coordinatorConfig;
private int tolerableFailureCheckpoints;
private final transient ScheduledExecutorService scheduler;
@@ -123,22 +122,21 @@ public class CheckpointCoordinator {
@SneakyThrows
public CheckpointCoordinator(CheckpointManager manager,
CheckpointStorage checkpointStorage,
- CheckpointStorageConfiguration storageConfig,
+ CheckpointConfig checkpointConfig,
long jobId,
CheckpointPlan plan,
- CheckpointCoordinatorConfiguration coordinatorConfig,
CheckpointIDCounter checkpointIdCounter,
PipelineState pipelineState) {
+
this.checkpointManager = manager;
this.checkpointStorage = checkpointStorage;
- this.storageConfig = storageConfig;
this.jobId = jobId;
this.pipelineId = plan.getPipelineId();
this.plan = plan;
- this.coordinatorConfig = coordinatorConfig;
+ this.coordinatorConfig = checkpointConfig;
this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints();
this.pendingCheckpoints = new ConcurrentHashMap<>();
- this.completedCheckpoints = new ArrayDeque<>(storageConfig.getMaxRetainedCheckpoints() + 1);
+ this.completedCheckpoints = new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
this.scheduler = Executors.newScheduledThreadPool(
1, runnable -> {
Thread thread = new Thread(runnable);
@@ -419,7 +417,7 @@ public class CheckpointCoordinator {
.pipelineId(pipelineId)
.states(states)
.build());
- if (completedCheckpoints.size() > storageConfig.getMaxRetainedCheckpoints()) {
+ if (completedCheckpoints.size() > coordinatorConfig.getStorage().getMaxRetainedCheckpoints()) {
CompletedCheckpoint superfluous = completedCheckpoints.removeFirst();
checkpointStorage.deleteCheckpoint(
String.valueOf(superfluous.getJobId()),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
deleted file mode 100644
index 257bdbe6a..000000000
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.checkpoint;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-public class CheckpointCoordinatorConfiguration implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public static final long MINIMAL_CHECKPOINT_TIME = 10;
-
- private final long checkpointInterval;
-
- private final long checkpointTimeout;
-
- private final int maxConcurrentCheckpoints;
-
- private final int tolerableFailureCheckpoints;
-
- private CheckpointCoordinatorConfiguration(long checkpointInterval,
- long checkpointTimeout,
- int maxConcurrentCheckpoints,
- int tolerableFailureCheckpoints) {
- this.checkpointInterval = checkpointInterval;
- this.checkpointTimeout = checkpointTimeout;
- this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
- this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
- }
-
- public long getCheckpointInterval() {
- return checkpointInterval;
- }
-
- public long getCheckpointTimeout() {
- return checkpointTimeout;
- }
-
- public int getMaxConcurrentCheckpoints() {
- return maxConcurrentCheckpoints;
- }
-
- public int getTolerableFailureCheckpoints() {
- return tolerableFailureCheckpoints;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CheckpointCoordinatorConfiguration that = (CheckpointCoordinatorConfiguration) o;
- return checkpointInterval == that.checkpointInterval
- && checkpointTimeout == that.checkpointTimeout
- && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints
- && tolerableFailureCheckpoints == that.tolerableFailureCheckpoints;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- checkpointInterval,
- checkpointTimeout,
- maxConcurrentCheckpoints,
- tolerableFailureCheckpoints);
- }
-
- @Override
- public String toString() {
- return "CheckpointCoordinatorConfiguration{" +
- "checkpointInterval=" + checkpointInterval +
- ", checkpointTimeout=" + checkpointTimeout +
- ", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
- ", tolerableFailureCheckpoints=" + tolerableFailureCheckpoints +
- '}';
- }
-
- public static CheckpointCoordinatorConfiguration.Builder builder() {
- return new Builder();
- }
-
- @SuppressWarnings("MagicNumber")
- public static final class Builder {
- // TODO 5000 is for test, we can update checkpointInterval to 300000 after we support it read from job config
- private long checkpointInterval = 5000;
- private long checkpointTimeout = 300000;
- private int maxConcurrentCheckpoints = 1;
- private int tolerableFailureCheckpoints = 0;
-
- private Builder() {
- }
-
- public Builder checkpointInterval(long checkpointInterval) {
- checkArgument(checkpointInterval < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint interval is 10 mills.");
- this.checkpointInterval = checkpointInterval;
- return this;
- }
-
- public Builder checkpointTimeout(long checkpointTimeout) {
- checkArgument(checkpointTimeout < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint timeout is 10 mills.");
- this.checkpointTimeout = checkpointTimeout;
- return this;
- }
-
- public Builder maxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
- checkArgument(maxConcurrentCheckpoints < 1, "The minimum number of concurrent checkpoints is 1.");
- this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
- return this;
- }
-
- public Builder tolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
- checkArgument(maxConcurrentCheckpoints < 0, "The number of tolerance failed checkpoints must be a natural number.");
- this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
- return this;
- }
-
- public CheckpointCoordinatorConfiguration build() {
- return new CheckpointCoordinatorConfiguration(
- checkpointInterval,
- checkpointTimeout,
- maxConcurrentCheckpoints,
- tolerableFailureCheckpoints);
- }
- }
-}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 5286c1aae..fd48f7e65 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -79,12 +80,11 @@ public class CheckpointManager {
public CheckpointManager(long jobId,
NodeEngine nodeEngine,
Map<Integer, CheckpointPlan> checkpointPlanMap,
- CheckpointCoordinatorConfiguration coordinatorConfig,
- CheckpointStorageConfiguration storageConfig) throws CheckpointStorageException {
+ CheckpointConfig checkpointConfig) throws CheckpointStorageException {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
this.subtaskWithAddresses = new HashMap<>();
- this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, storageConfig.getStorage())
+ this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
.create(new HashMap<>());
IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
this.coordinatorMap = checkpointPlanMap.values().parallelStream()
@@ -99,10 +99,9 @@ public class CheckpointManager {
}
return new CheckpointCoordinator(this,
checkpointStorage,
- storageConfig,
+ checkpointConfig,
jobId,
plan,
- coordinatorConfig,
idCounter,
pipelineState);
} catch (Exception e) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index b17bdc705..2df8df9c4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -21,15 +21,17 @@ import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorConfiguration;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfiguration;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
@@ -94,13 +96,15 @@ public class JobMaster extends Thread {
private volatile boolean restore = false;
+ private final EngineConfig engineConfig;
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@NonNull ResourceManager resourceManager,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap,
- @NonNull IMap ownedSlotProfilesIMap) {
+ @NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
@@ -110,6 +114,7 @@ public class JobMaster extends Thread {
this.resourceManager = resourceManager;
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
+ this.engineConfig = engineConfig;
}
public void init(long initializationTimestamp) throws Exception {
@@ -129,6 +134,9 @@ public class JobMaster extends Thread {
} else {
logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
}
+
+ CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig().getEnvOptions());
+
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
jobImmutableInformation,
@@ -144,9 +152,23 @@ public class JobMaster extends Thread {
jobImmutableInformation.getJobId(),
nodeEngine,
planTuple.f1(),
- // TODO: checkpoint config
- CheckpointCoordinatorConfiguration.builder().build(),
- CheckpointStorageConfiguration.builder().build());
+ checkpointConfig);
+ }
+
+ // TODO replace it after ReadableConfig Support parse yaml format, then use only one config to read engine and env config.
+ private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine, Map<String, Object> env) {
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ if (env.containsKey(ServerConfigOptions.CHECKPOINT_INTERVAL.key())) {
+ checkpointConfig.setCheckpointInterval((Integer) env.get(ServerConfigOptions.CHECKPOINT_INTERVAL.key()));
+ }
+ checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
+ checkpointConfig.setTolerableFailureCheckpoints(engine.getTolerableFailureCheckpoints());
+ checkpointConfig.setMaxConcurrentCheckpoints(engine.getMaxConcurrentCheckpoints());
+ CheckpointStorageConfig storageConfig = new CheckpointStorageConfig();
+ storageConfig.setMaxRetainedCheckpoints(engine.getStorage().getMaxRetainedCheckpoints());
+ storageConfig.setStorage(engine.getStorage().getStorage());
+ checkpointConfig.setStorage(storageConfig);
+ return checkpointConfig;
}
public void initStateFuture() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index fc9a06d30..e85cff0b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.service.slot;
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
@@ -60,19 +61,17 @@ public class DefaultSlotService implements SlotService {
private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
private ScheduledExecutorService scheduledExecutorService;
- private final boolean dynamicSlot;
- private final int slotNumber;
+ private final SlotServiceConfig config;
private volatile boolean initStatus;
private final IdGenerator idGenerator;
private final TaskExecutionService taskExecutionService;
private ConcurrentMap<Integer, SlotContext> contexts;
- public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot,
- int slotNumber) {
+ public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService,
+ SlotServiceConfig config) {
this.nodeEngine = nodeEngine;
- this.dynamicSlot = dynamicSlot;
+ this.config = config;
this.taskExecutionService = taskExecutionService;
- this.slotNumber = slotNumber;
this.idGenerator = new IdGenerator();
}
@@ -86,7 +85,7 @@ public class DefaultSlotService implements SlotService {
assignedResource = new AtomicReference<>(new ResourceProfile());
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,
String.format("hz.%s.seaTunnel.slotService.thread", nodeEngine.getHazelcastInstance().getName())));
- if (!dynamicSlot) {
+ if (!config.isDynamicSlot()) {
initFixedSlots();
}
unassignedResource.set(getNodeResource());
@@ -154,7 +153,7 @@ public class DefaultSlotService implements SlotService {
assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
profile.unassigned();
- if (!dynamicSlot) {
+ if (!config.isDynamicSlot()) {
unassignedSlots.put(profile.getSlotID(), profile);
}
assignedSlots.remove(profile.getSlotID());
@@ -169,10 +168,10 @@ public class DefaultSlotService implements SlotService {
}
private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
- if (unassignedSlots.isEmpty() && !dynamicSlot) {
+ if (unassignedSlots.isEmpty() && !config.isDynamicSlot()) {
return null;
}
- if (dynamicSlot) {
+ if (config.isDynamicSlot()) {
if (unassignedResource.get().enoughThan(profile)) {
return new SlotProfile(nodeEngine.getThisAddress(), (int) idGenerator.getNextId(), profile);
}
@@ -196,9 +195,9 @@ public class DefaultSlotService implements SlotService {
private void initFixedSlots() {
long maxMemory = Runtime.getRuntime().maxMemory();
- for (int i = 0; i < slotNumber; i++) {
+ for (int i = 0; i < config.getSlotNum(); i++) {
unassignedSlots.put(i, new SlotProfile(nodeEngine.getThisAddress(), i,
- new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
+ new ResourceProfile(CPU.of(0), Memory.of(maxMemory / config.getSlotNum()))));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 1801def2b..0ee8bbd80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
@@ -45,7 +47,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException {
long jobId = (long) (Math.random() * 1000000L);
CheckpointStorage checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class,
- CheckpointStorageConfiguration.builder().build().getStorage())
+ new CheckpointStorageConfig().getStorage())
.create(new HashMap<>());
CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobId, 1, 1,
Instant.now().toEpochMilli(),
@@ -63,8 +65,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
jobId,
nodeEngine,
planMap,
- CheckpointCoordinatorConfiguration.builder().build(),
- CheckpointStorageConfiguration.builder().build());
+ new CheckpointConfig());
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
CompletableFuture<Void> future = checkpointManager.listenPipeline(1, org.apache.seatunnel.engine.core.job.PipelineState.FINISHED);
future.join();