You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/09 10:50:05 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine][PhysicalPlan] Update PhysicalPlan to support scheduler by pipeline (#2382)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 246c64811 [ST-Engine][PhysicalPlan] Update PhysicalPlan to support scheduler by pipeline (#2382)
246c64811 is described below
commit 246c648113fc8428fda8a950d0a135f2397ad7d3
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Aug 9 18:49:58 2022 +0800
[ST-Engine][PhysicalPlan] Update PhysicalPlan to support scheduler by pipeline (#2382)
* Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
* add source file to licenserc.yaml
* fix checkstyle
* fix error
* tmp
* tmp
* Update PhysicalPlan to support scheduler by pipeline
* remove init in run
* fix string format error
---
LICENSE | 27 ++-
.../src/main/bin/start-seatunnel-seatunnel.sh | 64 ++++++
.../seatunnel/engine/client/JobConfigParser.java | 19 +-
.../engine/client/SeaTunnelClientTest.java | 2 +-
.../engine/common/config/EngineConfig.java | 8 +
.../config/YamlSeaTunnelDomConfigProcessor.java | 5 +
...ionSeaTunnel.java => JobCanceledException.java} | 9 +-
...SeaTunnel.java => JobDefineCheckException.java} | 6 +-
...ckExceptionSeaTunnel.java => JobException.java} | 10 +-
...ptionSeaTunnel.java => JobFailedException.java} | 9 +-
...ionSeaTunnel.java => JobNotFoundException.java} | 8 +-
.../engine/common/utils/ExceptionUtil.java | 8 +-
.../engine/common/utils/NonCompletableFuture.java | 73 ++++++
.../seatunnel/engine/core/job/JobStatus.java | 103 +++++++++
.../seatunnel/engine/core/job/PipelineState.java | 79 +++++++
.../seatunnel/engine/core/job/StatusUpdate.java} | 18 +-
.../seatunnel/engine/server/NodeExtension.java | 7 +-
.../engine/server/SeaTunnelNodeContext.java | 13 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 33 ++-
.../engine/server/SeaTunnelServerStarter.java | 12 +-
.../engine/server/dag/execution/ExecutionPlan.java | 13 +-
.../dag/execution/ExecutionPlanGenerator.java | 36 ++-
.../engine/server/dag/physical/PhysicalPlan.java | 132 +++++++++--
.../server/dag/physical/PhysicalPlanGenerator.java | 247 +++++++++++++++------
.../server/dag/physical/PhysicalPlanUtils.java | 21 +-
.../engine/server/dag/physical/PhysicalVertex.java | 137 ++++++++++++
.../engine/server/dag/physical/SubPlan.java | 137 ++++++++++++
.../engine/server/execution/ExecutionState.java | 79 +++++++
.../{TaskGroup.java => TaskExecutionState.java} | 32 ++-
.../engine/server/execution/TaskGroup.java | 9 +-
.../seatunnel/engine/server/master/JobMaster.java | 70 +++---
.../JobScheduler.java} | 17 +-
32 files changed, 1209 insertions(+), 234 deletions(-)
diff --git a/LICENSE b/LICENSE
index 696821eed..eb58900c3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -210,17 +210,20 @@ The text of each license is the standard Apache 2.0 license.
tools/dependencies/checkLicense.sh files from https://github.com/apache/skywalking
mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java from https://github.com/apache/flink
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
-generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java from https://github.com/hazelcast/hazelcast
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
+generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java from https://github.com/apache/flink
+
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
new file mode 100755
index 000000000..1cfc741d8
--- /dev/null
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -eu
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ] ; do
+ # shellcheck disable=SC2006
+ ls=`ls -ld "$PRG"`
+ # shellcheck disable=SC2006
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ # shellcheck disable=SC2006
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRG_DIR=`dirname "$PRG"`
+APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+ . "${CONF_DIR}/seatunnel-env.sh"
+fi
+
+if [ $# == 0 ]
+then
+ args="-h"
+else
+ args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+ # print usage
+ echo "${CMD}"
+ exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+ echo "Execute SeaTunnel Job: ${CMD}"
+ eval ${CMD}
+else
+ echo "${CMD}"
+ exit ${EXIT_CODE}
+fi
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index cbd014db7..58b2ee3ad 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -73,7 +73,8 @@ public class JobConfigParser {
private JobConfig jobConfig;
- protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator, @NonNull JobConfig jobConfig) {
+ protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator,
+ @NonNull JobConfig jobConfig) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
@@ -87,7 +88,7 @@ public class JobConfigParser {
List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
if (CollectionUtils.isEmpty(sinkConfigs) || CollectionUtils.isEmpty(sourceConfigs)) {
- throw new JobDefineCheckExceptionSeaTunnel("Source And Sink can not be null");
+ throw new JobDefineCheckException("Source And Sink can not be null");
}
jobConfigAnalyze(envConfigs);
@@ -129,7 +130,7 @@ public class JobConfigParser {
actions.add(sinkAction);
if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
- throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+ throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
+ " must be set in the sink plugin config when the job have complex dependencies");
}
String sourceTableName = config.getString(Plugin.SOURCE_TABLE_NAME);
@@ -137,7 +138,7 @@ public class JobConfigParser {
if (CollectionUtils.isEmpty(transformConfigList)) {
sourceAnalyze(sourceTableName, sinkAction);
} else if (transformConfigList.size() > 1) {
- throw new JobDefineCheckExceptionSeaTunnel("Only UnionTransform can have more than one upstream, "
+ throw new JobDefineCheckException("Only UnionTransform can have more than one upstream, "
+ sinkAction.getName()
+ " is not UnionTransform Connector");
} else {
@@ -150,7 +151,7 @@ public class JobConfigParser {
private void sourceAnalyze(String sourceTableName, Action action) {
List<Config> sourceConfigList = sourceResultTableNameMap.get(sourceTableName);
if (CollectionUtils.isEmpty(sourceConfigList)) {
- throw new JobDefineCheckExceptionSeaTunnel(action.getName()
+ throw new JobDefineCheckException(action.getName()
+ " source table name [" + sourceTableName + "] can not be found");
}
@@ -203,7 +204,7 @@ public class JobConfigParser {
private void initRelationMap(List<? extends Config> sourceConfigs, List<? extends Config> transformConfigs) {
for (Config config : sourceConfigs) {
if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
- throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+ throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
+ " must be set in the source plugin config when the job have complex dependencies");
}
String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
@@ -215,12 +216,12 @@ public class JobConfigParser {
for (Config config : transformConfigs) {
if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
- throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+ throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
+ " must be set in the transform plugin config when the job have complex dependencies");
}
if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
- throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+ throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
+ " must be set in the transform plugin config when the job have complex dependencies");
}
String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 2c451034c..ee9e2533a 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -44,7 +44,7 @@ public class SeaTunnelClientTest {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
- new SeaTunnelNodeContext());
+ new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}
@Test
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index e62481cf8..12f562c90 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -25,9 +25,17 @@ import lombok.Data;
public class EngineConfig {
private int backupCount;
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private int serverExecutorPoolSize = 20;
+
public EngineConfig setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
this.backupCount = newBackupCount;
return this;
}
+
+ public EngineConfig setServerExecutorPoolSize(int serverExecutorPoolSize) {
+ this.serverExecutorPoolSize = serverExecutorPoolSize;
+ return this;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5531db478..5d75a540a 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -71,6 +71,11 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor
getIntegerValue("backup-count", getTextContent(node))
);
break;
+ case "server-executor-pool-size":
+ engineConfig.setServerExecutorPoolSize(
+ getIntegerValue("server-executor-pool-size", getTextContent(node))
+ );
+ break;
default:
throw new AssertionError("Unrecognized element: " + name);
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobCanceledException.java
similarity index 76%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobCanceledException.java
index 2d0e10fe6..0d4daff1c 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobCanceledException.java
@@ -17,13 +17,16 @@
package org.apache.seatunnel.engine.common.exception;
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobCanceledException extends SeaTunnelEngineException {
+ public JobCanceledException(long jobId) {
+ super("Job with id " + jobId + " canceled");
+ }
- public JobDefineCheckExceptionSeaTunnel(String message) {
+ public JobCanceledException(String message) {
super(message);
}
- public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+ public JobCanceledException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckException.java
similarity index 80%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckException.java
index 2d0e10fe6..d4c758367 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckException.java
@@ -17,13 +17,13 @@
package org.apache.seatunnel.engine.common.exception;
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobDefineCheckException extends SeaTunnelEngineException {
- public JobDefineCheckExceptionSeaTunnel(String message) {
+ public JobDefineCheckException(String message) {
super(message);
}
- public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+ public JobDefineCheckException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobException.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobException.java
index 2d0e10fe6..ba872a63b 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobException.java
@@ -17,13 +17,17 @@
package org.apache.seatunnel.engine.common.exception;
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobException extends SeaTunnelEngineException {
- public JobDefineCheckExceptionSeaTunnel(String message) {
+ public JobException(String message) {
super(message);
}
- public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+ public JobException(String message, Throwable cause) {
super(message, cause);
}
+
+ public JobException(long jobId, String message, Throwable cause) {
+ super("Job with id [" + jobId + "] Exception " + message, cause);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobFailedException.java
similarity index 76%
rename from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobFailedException.java
index 2d0e10fe6..dd4bc2323 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobFailedException.java
@@ -17,13 +17,16 @@
package org.apache.seatunnel.engine.common.exception;
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobFailedException extends SeaTunnelEngineException {
+ public JobFailedException(long jobId) {
+ super("Job with id " + jobId + " failed");
+ }
- public JobDefineCheckExceptionSeaTunnel(String message) {
+ public JobFailedException(String message) {
super(message);
}
- public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+ public JobFailedException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundException.java
similarity index 78%
rename from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundException.java
index 1b4158886..fccf3c0bd 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundException.java
@@ -17,16 +17,16 @@
package org.apache.seatunnel.engine.common.exception;
-public class JobNotFoundExceptionSeaTunnel extends SeaTunnelEngineException {
- public JobNotFoundExceptionSeaTunnel(long jobId) {
+public class JobNotFoundException extends SeaTunnelEngineException {
+ public JobNotFoundException(long jobId) {
super("Job with id " + jobId + " not found");
}
- public JobNotFoundExceptionSeaTunnel(String message) {
+ public JobNotFoundException(String message) {
super(message);
}
- public JobNotFoundExceptionSeaTunnel(String message, Throwable cause) {
+ public JobNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index a3c211af5..8e8bf42ac 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -16,8 +16,8 @@
package org.apache.seatunnel.engine.common.utils;
-import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
-import org.apache.seatunnel.engine.common.exception.JobNotFoundExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
@@ -36,8 +36,8 @@ public final class ExceptionUtil {
private static final List<ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory>> EXCEPTIONS = Arrays.asList(
new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START, SeaTunnelEngineException.class, SeaTunnelEngineException::new),
- new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new),
- new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 2, JobDefineCheckExceptionSeaTunnel.class, JobDefineCheckExceptionSeaTunnel::new)
+ new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundException.class, JobNotFoundException::new),
+ new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 2, JobDefineCheckException.class, JobDefineCheckException::new)
);
private ExceptionUtil() {
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
new file mode 100644
index 000000000..a3c6d2380
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A future which prevents completion by outside caller
+ */
+public class NonCompletableFuture<T> extends CompletableFuture<T> {
+
+ public NonCompletableFuture() {
+ }
+
+ public NonCompletableFuture(CompletableFuture<T> chainedFuture) {
+ chainedFuture.whenComplete((r, t) -> {
+ if (t != null) {
+ internalCompleteExceptionally(t);
+ } else {
+ internalComplete(r);
+ }
+ });
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+ }
+
+ @Override
+ public boolean complete(T value) {
+ throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException("This future can't be cancelled by an outside caller");
+ }
+
+ @Override
+ public void obtrudeException(Throwable ex) {
+ throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+ }
+
+ @Override
+ public void obtrudeValue(T value) {
+ throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+ }
+
+ private void internalComplete(T value) {
+ super.complete(value);
+ }
+
+ private void internalCompleteExceptionally(Throwable ex) {
+ super.completeExceptionally(ex);
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
new file mode 100644
index 000000000..d0c9fd963
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+/** Possible states of a job once it has been accepted by the dispatcher. */
+public enum JobStatus {
+ /**
+ * The job has been received by the Dispatcher, and is waiting for the job manager to receive
+ * leadership and to be created.
+ */
+ INITIALIZING(EndState.NOT_END),
+
+ /** Job is newly created, no task has started to run. */
+ CREATED(EndState.NOT_END),
+
+ /** Some tasks are scheduled or running, some may be pending, some may be finished. */
+ RUNNING(EndState.NOT_END),
+
+ /** The job has failed and is currently waiting for the cleanup to complete. */
+ FAILING(EndState.NOT_END),
+
+ /** The job has failed with a non-recoverable task failure. */
+ FAILED(EndState.GLOBALLY),
+
+ /** Job is being cancelled. */
+ CANCELLING(EndState.NOT_END),
+
+ /** Job has been cancelled. */
+ CANCELED(EndState.GLOBALLY),
+
+ /** All of the job's tasks have successfully finished. */
+ FINISHED(EndState.GLOBALLY),
+
+ /** The job is currently undergoing a reset and total restart. */
+ RESTARTING(EndState.NOT_END),
+
+ /**
+ * The job has been suspended which means that it has been stopped but not been removed from a
+ * potential HA job store.
+ */
+ SUSPENDED(EndState.LOCALLY),
+
+ /** The job is currently reconciling and waits for task execution report to recover state. */
+ RECONCILING(EndState.NOT_END);
+
+ // --------------------------------------------------------------------------------------------
+
+ private enum EndState {
+ NOT_END,
+ LOCALLY,
+ GLOBALLY
+ }
+
+ private final EndState endState;
+
+ JobStatus(EndState endState) {
+ this.endState = endState;
+ }
+
+ /**
+ * Checks whether this state is <i>globally terminal</i>. A globally terminal job is complete
+ * and cannot fail any more and will not be restarted or recovered by another standby master
+ * node.
+ *
+ * <p>When a globally terminal state has been reached, all recovery data for the job is dropped
+ * from the high-availability services.
+ *
+ * @return True, if this job status is globally terminal, false otherwise.
+ */
+ public boolean isGloballyEndState() {
+ return endState == EndState.GLOBALLY;
+ }
+
+ /**
+ * Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the state of
+ * a job's execution graph within an executing JobManager. If the execution graph is locally
+ * terminal, the JobManager will not continue executing or recovering the job.
+ *
+ * <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED},
+ * which is typically entered when the executing JobManager loses its leader status.
+ *
+ * @return True, if this job status is terminal, false otherwise.
+ */
+ public boolean isEndState() {
+ return endState != EndState.NOT_END;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
new file mode 100644
index 000000000..df6c2a370
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+/**
+ * An enumeration of all states that a pipeline can be in during its execution. Pipeline usually start in
+ * the state {@code CREATED} and switch states according to this diagram:
+ *
+ * <pre>{@code
+ * CREATED -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
+ * | | | | |
+ * | | | +-----+--------------+
+ * | | V V
+ * | | CANCELLING -----+----> CANCELED
+ * | | |
+ * | +-------------------------+
+ * |
+ * | ... -> FAILED
+ * V
+ * RECONCILING -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
+ *
+ * }</pre>
+ *
+ * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED} state if job
+ * manager fail over, and the {@code RECONCILING} state can switch into any existing Pipeline state.
+ *
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
+ *
+ * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are considered terminal
+ * states.
+ */
+public enum PipelineState {
+ CREATED,
+
+ SCHEDULED,
+
+ DEPLOYING,
+
+ RUNNING,
+
+ /**
+ * This state marks "successfully completed". It can only be reached when a program reaches the
+ * "end of its input". The "end of input" can be reached when consuming a bounded input (fix set
+ * of files, bounded query, etc) or when stopping a program (not cancelling!) which make the
+ * input look like it reached its end at a specific point.
+ */
+ FINISHED,
+
+ CANCELING,
+
+ CANCELED,
+
+ FAILED,
+
+ RECONCILING,
+
+ /** Restoring last possible valid state of the pipeline if it has it. */
+ INITIALIZING;
+
+ public boolean isEnd() {
+ return this == FINISHED || this == CANCELED || this == FAILED;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/StatusUpdate.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/StatusUpdate.java
index b72c62287..b7b23b1a6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/StatusUpdate.java
@@ -15,19 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.engine.core.job;
-import java.util.List;
-
-public class ExecutionPlan {
-
- private final List<Pipeline> pipelines;
-
- ExecutionPlan(List<Pipeline> pipelines) {
- this.pipelines = pipelines;
- }
-
- public List<Pipeline> getPipelines() {
- return pipelines;
- }
+public enum StatusUpdate {
+ STOP,
+ CANCEL;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index 1bd5a83d4..311673271 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -17,18 +17,21 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.instance.impl.DefaultNodeExtension;
import com.hazelcast.instance.impl.Node;
+import lombok.NonNull;
import java.util.Map;
public class NodeExtension extends DefaultNodeExtension {
private final NodeExtensionCommon extCommon;
- public NodeExtension(Node node) {
+ public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
super(node);
- extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node));
+ extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node, seaTunnelConfig));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 9c112591f..388b0fad9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -17,14 +17,23 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+
import com.hazelcast.instance.impl.DefaultNodeContext;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
+import lombok.NonNull;
public class SeaTunnelNodeContext extends DefaultNodeContext {
+ private final SeaTunnelConfig seaTunnelConfig;
+
+ public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
+ this.seaTunnelConfig = seaTunnelConfig;
+ }
+
@Override
- public NodeExtension createNodeExtension(Node node) {
- return new org.apache.seatunnel.engine.server.NodeExtension(node);
+ public NodeExtension createNodeExtension(@NonNull Node node) {
+ return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 42c3cb157..24a36c355 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,7 +17,8 @@
package org.apache.seatunnel.engine.server;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.serialization.Data;
@@ -30,9 +31,12 @@ import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
+import lombok.NonNull;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
@@ -43,13 +47,20 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private TaskExecutionService taskExecutionService;
- public SeaTunnelServer(Node node) {
+ private final ExecutorService executorService;
+
+ private final SeaTunnelConfig seaTunnelConfig;
+
+ public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
this.logger = node.getLogger(getClass());
this.liveOperationRegistry = new LiveOperationRegistry();
+ this.seaTunnelConfig = seaTunnelConfig;
+ this.executorService =
+ Executors.newFixedThreadPool(seaTunnelConfig.getEngineConfig().getServerExecutorPoolSize());
logger.info("SeaTunnel server start...");
}
- public TaskExecutionService getTaskExecutionService(){
+ public TaskExecutionService getTaskExecutionService() {
return this.taskExecutionService;
}
@@ -103,21 +114,19 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
*/
@SuppressWarnings("checkstyle:MagicNumber")
public CompletableFuture<Void> submitJob(Data jobImmutableInformation) {
- // TODO Here we need new a JobMaster and run it.
- JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
- logger.info("Job [" + jobInformation.getJobId() + "] submit");
- logger.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
- new Thread(() -> {
+ JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
+ executorService.submit(() -> {
try {
- Thread.sleep(2000);
- logger.info("I am sleep 2000 ms");
- } catch (InterruptedException e) {
+ jobMaster.init();
+ } catch (Throwable e) {
throw new RuntimeException(e);
} finally {
+ // We specify that when init is complete, the submitJob is complete
voidCompletableFuture.complete(null);
}
- }).start();
+ jobMaster.run();
+ });
return voidCompletableFuture;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 49db3f60c..0104c1e55 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -17,18 +17,16 @@
package org.apache.seatunnel.engine.server;
-import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
public class SeaTunnelServerStarter {
public static void main(String[] args) {
- Config config = new Config();
- config.getSecurityConfig().setEnabled(false);
- config.getJetConfig().setEnabled(false);
- config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
+ SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+ Thread.currentThread().getName(), new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
index b72c62287..26c93b69e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
@@ -17,17 +17,28 @@
package org.apache.seatunnel.engine.server.dag.execution;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+
+import lombok.NonNull;
+
import java.util.List;
public class ExecutionPlan {
private final List<Pipeline> pipelines;
- ExecutionPlan(List<Pipeline> pipelines) {
+ private final JobImmutableInformation jobImmutableInformation;
+
+ public ExecutionPlan(@NonNull List<Pipeline> pipelines, @NonNull JobImmutableInformation jobImmutableInformation) {
this.pipelines = pipelines;
+ this.jobImmutableInformation = jobImmutableInformation;
}
public List<Pipeline> getPipelines() {
return pipelines;
}
+
+ public JobImmutableInformation getJobImmutableInformation() {
+ return jobImmutableInformation;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index af3a4c68c..2c16829b0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -29,6 +29,9 @@ import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+
+import lombok.NonNull;
import java.net.URL;
import java.util.ArrayList;
@@ -48,9 +51,17 @@ public class ExecutionPlanGenerator {
private final Map<Integer, LogicalVertex> logicalVertexes;
private final List<LogicalEdge> logicalEdges;
- public ExecutionPlanGenerator(LogicalDag logicalPlan) {
+ private final JobImmutableInformation jobImmutableInformation;
+
+ private final long initializationTimestamp;
+
+ public ExecutionPlanGenerator(@NonNull LogicalDag logicalPlan,
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp) {
this.logicalVertexes = new HashMap<>(logicalPlan.getLogicalVertexMap());
this.logicalEdges = new ArrayList<>(logicalPlan.getEdges());
+ this.jobImmutableInformation = jobImmutableInformation;
+ this.initializationTimestamp = initializationTimestamp;
}
public ExecutionPlan generate() {
@@ -59,7 +70,7 @@ public class ExecutionPlanGenerator {
throw new IllegalArgumentException("ExecutionPlan Builder must have LogicalPlan and Action");
}
List<LogicalVertex> next = logicalVertexes.values().stream().filter(a -> a.getAction() instanceof SourceAction)
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
while (!next.isEmpty()) {
List<LogicalVertex> newNext = new ArrayList<>();
next.forEach(n -> {
@@ -71,12 +82,12 @@ public class ExecutionPlanGenerator {
Map<Integer, LogicalVertex> vertexes = new HashMap<>();
actions.forEach((key, value) -> vertexes.put(key, new LogicalVertex(key, value,
- logicalVertexes.get(key).getParallelism())));
+ logicalVertexes.get(key).getParallelism())));
return new ExecutionPlan(PipelineGenerator.generatePipelines(edgeMap.entrySet().stream()
- .flatMap(e -> e.getValue().stream().map(d -> new ExecutionEdge(convertFromLogical(vertexes.get(e.getKey())),
- convertFromLogical(vertexes.get(d)))))
- .collect(Collectors.toList())));
+ .flatMap(e -> e.getValue().stream().map(d -> new ExecutionEdge(convertFromLogical(vertexes.get(e.getKey())),
+ convertFromLogical(vertexes.get(d)))))
+ .collect(Collectors.toList())), jobImmutableInformation);
}
private ExecutionVertex convertFromLogical(LogicalVertex vertex) {
@@ -105,14 +116,14 @@ public class ExecutionPlanGenerator {
SourceAction<?, ?, ?> source = (SourceAction<?, ?, ?>) start;
jars.addAll(source.getJarUrls());
executionAction = new PhysicalSourceAction<>(start.getId(), start.getName(),
- source.getSource(), new ArrayList<>(jars), transforms);
+ source.getSource(), new ArrayList<>(jars), transforms);
actions.put(start.getId(), start);
} else if (start instanceof TransformAction) {
TransformAction transform = (TransformAction) start;
jars.addAll(transform.getJarUrls());
transforms.add(0, transform.getTransform());
executionAction = new TransformChainAction(start.getId(), start.getName(),
- new ArrayList<>(jars), transforms);
+ new ArrayList<>(jars), transforms);
actions.put(start.getId(), executionAction);
} else {
throw new UnknownActionException(start);
@@ -121,7 +132,8 @@ public class ExecutionPlanGenerator {
final Action e = end;
// find next should be converted action
- List<LogicalVertex> nextStarts = logicalEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(e))
+ List<LogicalVertex> nextStarts =
+ logicalEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(e))
.map(LogicalEdge::getRightVertex).collect(Collectors.toList());
for (LogicalVertex n : nextStarts) {
if (!edgeMap.containsKey(executionAction.getId())) {
@@ -134,11 +146,11 @@ public class ExecutionPlanGenerator {
private List<TransformAction> findMigrateTransform(List<LogicalEdge> executionEdges, Action start) {
List<Action> actionAfterStart =
- executionEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(start))
- .map(edge -> edge.getRightVertex().getAction()).collect(Collectors.toList());
+ executionEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(start))
+ .map(edge -> edge.getRightVertex().getAction()).collect(Collectors.toList());
// make sure the start's next only have one LogicalTransform, so it can be migrated.
if (actionAfterStart.size() != 1 || actionAfterStart.get(0) instanceof PartitionTransformAction ||
- actionAfterStart.get(0) instanceof SinkAction) {
+ actionAfterStart.get(0) instanceof SinkAction) {
return Collections.emptyList();
} else {
List<TransformAction> transforms = new ArrayList<>();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index af5a9d7c1..ecd684a3a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -17,38 +17,134 @@
package org.apache.seatunnel.engine.server.dag.physical;
-import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class PhysicalPlan {
- private final List<SubPlan> plans;
+ private static final ILogger LOGGER = Logger.getLogger(PhysicalPlan.class);
- public PhysicalPlan(List<SubPlan> plans) {
- this.plans = plans;
- }
+ private final List<SubPlan> pipelineList;
+
+ private AtomicInteger finishedPipelineNum = new AtomicInteger(0);
+
+ private AtomicInteger canceledPipelineNum = new AtomicInteger(0);
+
+ private AtomicInteger failedPipelineNum = new AtomicInteger(0);
+
+ private AtomicReference<JobStatus> jobStatus = new AtomicReference<>();
+
+ private final JobImmutableInformation jobImmutableInformation;
+
+ /**
+ * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+ * execution graph transitioned into a certain state. The index into this array is the ordinal
+ * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
+ * stateTimestamps[RUNNING.ordinal()]}.
+ */
+ private final long[] stateTimestamps;
+
+ /**
+ * when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan
+ * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
+ */
+ private final CompletableFuture<JobStatus> jobEndFuture;
- public List<SubPlan> getPlans() {
- return plans;
- }
- public static class SubPlan {
- private final List<TaskGroupInfo> tasks;
+ /**
+ * This future only can completion by the {@link SubPlan } subPlanFuture.
+ * When subPlanFuture completed, this NonCompletableFuture's whenComplete method will be called.
+ */
+ private final NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan;
- private final List<TaskGroupInfo> coordinatorTasks;
+ private final ExecutorService executorService;
- public SubPlan(List<TaskGroupInfo> tasks, List<TaskGroupInfo> coordinatorTasks) {
- this.tasks = tasks;
- this.coordinatorTasks = coordinatorTasks;
+ public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
+ @NonNull ExecutorService executorService,
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp,
+ @NonNull NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan) {
+ this.executorService = executorService;
+ this.jobImmutableInformation = jobImmutableInformation;
+ stateTimestamps = new long[JobStatus.values().length];
+ this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+ this.jobStatus.set(JobStatus.CREATED);
+ this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
+ this.jobEndFuture = new CompletableFuture<JobStatus>();
+ this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
+ this.pipelineList = pipelineList;
+
+ Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
+ x.whenComplete((v, t) -> {
+ if (PipelineState.CANCELED.equals(v)) {
+ canceledPipelineNum.incrementAndGet();
+ } else if (PipelineState.FAILED.equals(v)) {
+ failedPipelineNum.incrementAndGet();
+ }
+
+ if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
+ if (failedPipelineNum.get() > 0) {
+ jobStatus.set(JobStatus.FAILING);
+ } else if (canceledPipelineNum.get() > 0) {
+ jobStatus.set(JobStatus.CANCELED);
+ } else {
+ jobStatus.set(JobStatus.FINISHED);
+ }
+ jobEndFuture.complete(jobStatus.get());
+ }
+ });
+ });
+ }
+
+ public List<SubPlan> getPipelineList() {
+ return pipelineList;
+ }
+
+ public void turnToRunning() {
+ if (!updateJobState(JobStatus.CREATED, JobStatus.RUNNING)) {
+ throw new IllegalStateException(
+ "Job may only be scheduled from state " + JobStatus.CREATED);
}
+ }
- public List<TaskGroupInfo> getTasks() {
- return tasks;
+ public boolean updateJobState(JobStatus current, JobStatus targetState) {
+ // consistency check
+ if (current.isEndState()) {
+ String message = "Job is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
}
- public List<TaskGroupInfo> getCoordinatorTasks() {
- return coordinatorTasks;
+ // now do the actual state transition
+ if (jobStatus.get() == current) {
+ jobStatus.set(targetState);
+ LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(),
+ current,
+ targetState));
+
+ stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+ return true;
+ } else {
+ return false;
}
}
+
+ public CompletableFuture<JobStatus> getJobEndCompletableFuture() {
+ return this.jobEndFuture;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 1bb0652c1..68da4e796 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -18,11 +18,14 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateDataQueue;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.PipelineState;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
@@ -30,21 +33,29 @@ import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
-import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import com.google.common.collect.Lists;
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.NodeEngine;
+import lombok.NonNull;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class PhysicalPlanGenerator {
@@ -54,97 +65,208 @@ public class PhysicalPlanGenerator {
private final IdGenerator idGenerator = new IdGenerator();
- public PhysicalPlanGenerator(ExecutionPlan executionPlan, NodeEngine nodeEngine) {
+ private final JobImmutableInformation jobImmutableInformation;
+
+ private final long initializationTimestamp;
+
+ private final ExecutorService executorService;
+
+ private final FlakeIdGenerator flakeIdGenerator;
+
+ public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,
+ @NonNull NodeEngine nodeEngine,
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp,
+ @NonNull ExecutorService executorService,
+ @NonNull FlakeIdGenerator flakeIdGenerator) {
edgesList = executionPlan.getPipelines().stream().map(Pipeline::getEdges).collect(Collectors.toList());
this.nodeEngine = nodeEngine;
+ this.jobImmutableInformation = jobImmutableInformation;
+ this.initializationTimestamp = initializationTimestamp;
+ this.executorService = executorService;
+ this.flakeIdGenerator = flakeIdGenerator;
}
public PhysicalPlan generate() {
// TODO Determine which tasks do not need to be restored according to state
- return new PhysicalPlan(edgesList.stream().map(edges -> {
+ AtomicInteger index = new AtomicInteger(-1);
+ CopyOnWriteArrayList<NonCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
+ new CopyOnWriteArrayList<>();
+
+ Stream<SubPlan> subPlanStream = edgesList.stream().map(edges -> {
+ int currIndex = index.incrementAndGet();
+ CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList =
+ new CopyOnWriteArrayList<>();
List<PhysicalSourceAction<?, ?, ?>> sources = findSourceAction(edges);
- List<TaskGroupInfo> coordinatorTasks = getEnumeratorTask(sources);
+ List<PhysicalVertex> coordinatorVertexList =
+ getEnumeratorTask(sources, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList);
- List<TaskGroupInfo> tasks = getSourceTask(edges, sources);
+ List<PhysicalVertex> physicalVertexList =
+ getSourceTask(edges, sources, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList);
- tasks.addAll(getPartitionTask(edges));
+ physicalVertexList.addAll(
+ getPartitionTask(edges, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList));
- coordinatorTasks.addAll(getCommitterTask(edges));
+ coordinatorVertexList.addAll(
+ getCommitterTask(edges, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList));
- return new PhysicalPlan.SubPlan(tasks, coordinatorTasks);
- }).collect(Collectors.toList()));
+ CompletableFuture<PipelineState> pipelineFuture = new CompletableFuture<>();
+ waitForCompleteBySubPlanList.add(new NonCompletableFuture<>(pipelineFuture));
+
+ return new SubPlan(currIndex,
+ edgesList.size(),
+ initializationTimestamp,
+ physicalVertexList,
+ coordinatorVertexList,
+ pipelineFuture,
+ waitForCompleteByPhysicalVertexList.toArray(
+ new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]));
+ });
+
+ PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
+ executorService,
+ jobImmutableInformation,
+ initializationTimestamp,
+ waitForCompleteBySubPlanList.toArray(new NonCompletableFuture[waitForCompleteBySubPlanList.size()]));
+ return physicalPlan;
}
private List<PhysicalSourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PhysicalSourceAction)
- .map(s -> (PhysicalSourceAction<?, ?, ?>) s.getLeftVertex().getAction())
- .collect(Collectors.toList());
+ .map(s -> (PhysicalSourceAction<?, ?, ?>) s.getLeftVertex().getAction())
+ .collect(Collectors.toList());
}
- private List<TaskGroupInfo> getCommitterTask(List<ExecutionEdge> edges) {
- return edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
- .map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
- .map(s -> {
- SinkAggregatedCommitterTask t =
- new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
- return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
- }).collect(Collectors.toList());
+ private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges,
+ int pipelineIndex,
+ int totalPipelineNum,
+ CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+ AtomicInteger atomicInteger = new AtomicInteger(-1);
+ List<ExecutionEdge> collect = edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
+ .collect(Collectors.toList());
+
+ return collect.stream().map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
+ .map(s -> {
+ SinkAggregatedCommitterTask t =
+ new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
+
+ CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+ waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+ return new PhysicalVertex(atomicInteger.incrementAndGet(),
+ executorService,
+ collect.size(),
+ new TaskGroup("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ null);
+ }).collect(Collectors.toList());
}
- private List<TaskGroupInfo> getPartitionTask(List<ExecutionEdge> edges) {
+ private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
+ int pipelineIndex,
+ int totalPipelineNum,
+ CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PartitionTransformAction)
- .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
- .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
- .flatMap(flow -> {
- List<TaskGroupInfo> t = new ArrayList<>();
- for (int i = 0; i < flow.getAction().getParallelism(); i++) {
- SeaTunnelTask seaTunnelTask = new SeaTunnelTask(idGenerator.getNextId(), flow);
- t.add(new TaskGroupInfo(toData(new TaskGroup(seaTunnelTask)),
- seaTunnelTask.getJarsUrl()));
- }
- return t.stream();
- }).collect(Collectors.toList());
+ .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
+ .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
+ .flatMap(flow -> {
+ List<PhysicalVertex> t = new ArrayList<>();
+ for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+ SeaTunnelTask seaTunnelTask = new SeaTunnelTask(idGenerator.getNextId(), flow);
+
+ CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+ waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+ t.add(new PhysicalVertex(i,
+ executorService,
+ flow.getAction().getParallelism(),
+ new TaskGroup("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ seaTunnelTask.getJarsUrl()));
+ }
+ return t.stream();
+ }).collect(Collectors.toList());
}
- private List<TaskGroupInfo> getEnumeratorTask(List<PhysicalSourceAction<?, ?, ?>> sources) {
+ private List<PhysicalVertex> getEnumeratorTask(List<PhysicalSourceAction<?, ?, ?>> sources,
+ int pipelineIndex,
+ int totalPipelineNum,
+ CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+ AtomicInteger atomicInteger = new AtomicInteger(-1);
+
return sources.stream().map(s -> {
SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(idGenerator.getNextId(), s);
- return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
+ CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+ waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+ return new PhysicalVertex(atomicInteger.incrementAndGet(),
+ executorService,
+ sources.size(),
+ new TaskGroup(s.getName(), Lists.newArrayList(t)),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ t.getJarsUrl());
}).collect(Collectors.toList());
}
- private List<TaskGroupInfo> getSourceTask(List<ExecutionEdge> edges,
- List<PhysicalSourceAction<?, ?, ?>> sources) {
+ private List<PhysicalVertex> getSourceTask(List<ExecutionEdge> edges,
+ List<PhysicalSourceAction<?, ?, ?>> sources,
+ int pipelineIndex,
+ int totalPipelineNum,
+ CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
return sources.stream()
- .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
- .flatMap(flow -> {
- List<TaskGroupInfo> t = new ArrayList<>();
- List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
- if (sourceWithSink(flow)) {
- flows.addAll(splitSinkFromFlow(flow));
- }
- for (int i = 0; i < flow.getAction().getParallelism(); i++) {
- List<SeaTunnelTask> taskList =
- flows.stream().map(f -> new SeaTunnelTask(idGenerator.getNextId(), f)).collect(Collectors.toList());
- Set<URL> jars =
- taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
- t.add(new TaskGroupInfo(toData(new TaskGroup(taskList.stream().map(task -> (Task) task).collect(Collectors.toList()))), jars));
- }
- return t.stream();
- }).collect(Collectors.toList());
+ .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
+ .flatMap(flow -> {
+ List<PhysicalVertex> t = new ArrayList<>();
+ List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
+ if (sourceWithSink(flow)) {
+ flows.addAll(splitSinkFromFlow(flow));
+ }
+ for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+ List<SeaTunnelTask> taskList =
+ flows.stream().map(f -> new SeaTunnelTask(idGenerator.getNextId(), f))
+ .collect(Collectors.toList());
+ Set<URL> jars =
+ taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
+
+ CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+ waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+ // TODO We need give every task a appropriate name
+ t.add(new PhysicalVertex(i,
+ executorService,
+ flow.getAction().getParallelism(),
+ new TaskGroup("SourceTask",
+ taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ jars));
+ }
+ return t.stream();
+ }).collect(Collectors.toList());
}
private static List<Flow> splitSinkFromFlow(Flow flow) {
List<PhysicalExecutionFlow> sinkFlows =
- flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow) f)
- .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
+ flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow) f)
+ .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
List<Flow> allFlows = new ArrayList<>();
flow.getNext().removeAll(sinkFlows);
sinkFlows.forEach(s -> {
IntermediateDataQueue queue = new IntermediateDataQueue(s.getAction().getId(),
- s.getAction().getName() + "-Queue", s.getAction().getParallelism());
+ s.getAction().getName() + "-Queue", s.getAction().getParallelism());
IntermediateExecutionFlow intermediateFlow = new IntermediateExecutionFlow(queue);
flow.getNext().add(intermediateFlow);
IntermediateExecutionFlow intermediateFlowQuote = new IntermediateExecutionFlow(queue);
@@ -153,26 +275,27 @@ public class PhysicalPlanGenerator {
});
if (flow.getNext().size() > sinkFlows.size()) {
- allFlows.addAll(flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
+ allFlows.addAll(
+ flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
}
return allFlows;
}
private static boolean sourceWithSink(PhysicalExecutionFlow flow) {
return flow.getAction() instanceof SinkAction ||
- flow.getNext().stream().map(f -> (PhysicalExecutionFlow) f).map(PhysicalPlanGenerator::sourceWithSink)
- .collect(Collectors.toList()).contains(true);
+ flow.getNext().stream().map(f -> (PhysicalExecutionFlow) f).map(PhysicalPlanGenerator::sourceWithSink)
+ .collect(Collectors.toList()).contains(true);
}
private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
List<Action> actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start))
- .map(e -> e.getLeftVertex().getAction()).collect(Collectors.toList());
+ .map(e -> e.getLeftVertex().getAction()).collect(Collectors.toList());
List<Flow> wrappers = actions.stream()
- .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
- .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+ .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
+ .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
wrappers.addAll(actions.stream()
- .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
- .map(a -> new PhysicalExecutionFlow(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
+ .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
+ .map(a -> new PhysicalExecutionFlow(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
return wrappers;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
index fc8284877..2e15f159f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
@@ -18,14 +18,29 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.spi.impl.NodeEngine;
+import lombok.NonNull;
+
+import java.util.concurrent.ExecutorService;
public class PhysicalPlanUtils {
- public static PhysicalPlan fromLogicalDAG(LogicalDag logicalDag, NodeEngine nodeEngine) {
- return new PhysicalPlanGenerator(new ExecutionPlanGenerator(logicalDag).generate(), nodeEngine).generate();
+ public static PhysicalPlan fromLogicalDAG(@NonNull LogicalDag logicalDag,
+ @NonNull NodeEngine nodeEngine,
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp,
+ @NonNull ExecutorService executorService,
+ @NonNull FlakeIdGenerator flakeIdGenerator) {
+ return new PhysicalPlanGenerator(
+ new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, initializationTimestamp).generate(),
+ nodeEngine,
+ jobImmutableInformation,
+ initializationTimestamp,
+ executorService,
+ flakeIdGenerator).generate();
}
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
new file mode 100644
index 000000000..fbc568a62
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.net.URL;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * PhysicalVertex is responsible for the scheduling and execution of a single task parallel
+ * Each {@link org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex} generates some PhysicalVertex.
+ * And the number of PhysicalVertex equals the {@link ExecutionVertex#getParallelism()}.
+ */
+public class PhysicalVertex {
+
+ private static final ILogger LOGGER = Logger.getLogger(PhysicalVertex.class);
+
+ /**
+ * the index of PhysicalVertex
+ */
+ private final int subTaskGroupIndex;
+
+ private final String taskNameWithSubtaskAndPipeline;
+
+ private final int parallelism;
+
+ private final TaskGroup taskGroup;
+
+ private final ExecutorService executorService;
+
+ private final FlakeIdGenerator flakeIdGenerator;
+
+ private final int pipelineIndex;
+
+ private final int totalPipelineNum;
+
+ private final Set<URL> pluginJarsUrls;
+
+ /**
+ * When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
+ * in {@link SubPlan} whenComplete method will be called.
+ */
+ private final CompletableFuture<TaskExecutionState> taskFuture;
+
+
+ /**
+ * This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
+ */
+ private NonCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
+
+ public PhysicalVertex(int subTaskGroupIndex,
+ @NonNull ExecutorService executorService,
+ int parallelism,
+ @NonNull TaskGroup taskGroup,
+ @NonNull CompletableFuture<TaskExecutionState> taskFuture,
+ @NonNull FlakeIdGenerator flakeIdGenerator,
+ int pipelineIndex,
+ int totalPipelineNum,
+ Set<URL> pluginJarsUrls) {
+ this.subTaskGroupIndex = subTaskGroupIndex;
+ this.executorService = executorService;
+ this.parallelism = parallelism;
+ this.taskGroup = taskGroup;
+ this.flakeIdGenerator = flakeIdGenerator;
+ this.pipelineIndex = pipelineIndex;
+ this.totalPipelineNum = totalPipelineNum;
+ this.pluginJarsUrls = pluginJarsUrls;
+ this.taskNameWithSubtaskAndPipeline =
+ String.format(
+ "task: [%s (%d/%d)], pipeline: [%d/%d]",
+ taskGroup.getTaskGroupName(),
+ subTaskGroupIndex + 1,
+ parallelism,
+ pipelineIndex,
+ totalPipelineNum);
+ this.taskFuture = taskFuture;
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public void deploy() throws JobException {
+
+ // TODO really submit job to ExecutionService and get a NonCompletableFuture<ExecutionState>
+ long executionId = flakeIdGenerator.newId();
+ CompletableFuture<TaskExecutionState> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(5000);
+ return new TaskExecutionState(executionId, ExecutionState.FINISHED, null);
+ } catch (InterruptedException e) {
+ return new TaskExecutionState(executionId, ExecutionState.FAILED, e);
+ }
+ }, executorService);
+
+ waitForCompleteByExecutionService = new NonCompletableFuture<TaskExecutionState>(uCompletableFuture);
+ waitForCompleteByExecutionService.whenComplete((v, t) -> {
+ if (t != null) {
+ // TODO t.getMessage() need be replace
+ LOGGER.info(String.format("The Task %s Failed with Exception: %s",
+ this.taskNameWithSubtaskAndPipeline,
+ t.getMessage()));
+ taskFuture.complete(new TaskExecutionState(executionId, ExecutionState.FAILED, t));
+ } else {
+ LOGGER.info(String.format("The Task %s end with state %s",
+ this.taskNameWithSubtaskAndPipeline,
+ v));
+ taskFuture.complete(v);
+ }
+ });
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
new file mode 100644
index 000000000..8c1c4b0b6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SubPlan {
+ private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
+
+ private final List<PhysicalVertex> physicalVertexList;
+
+ private final List<PhysicalVertex> coordinatorVertexList;
+
+ private final int pipelineIndex;
+
+ private final int totalPipelineNum;
+
+ private AtomicInteger finishedTaskNum = new AtomicInteger(0);
+
+ private AtomicInteger canceledTaskNum = new AtomicInteger(0);
+
+ private AtomicInteger failedTaskNum = new AtomicInteger(0);
+
+ private AtomicReference<PipelineState> pipelineState = new AtomicReference<>();
+
+ private final String pipelineNameWithIndex;
+
+ /**
+ * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+ * pipeline transitioned into a certain state. The index into this array is the ordinal
+ * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
+ * stateTimestamps[RUNNING.ordinal()]}.
+ */
+ private final long[] stateTimestamps;
+
+ /**
+ * Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
+ * whenComplete method will be called.
+ */
+ private final CompletableFuture<PipelineState> pipelineFuture;
+
+ /**
+ * This future only can completion by the {@link PhysicalVertex } taskFuture.
+ * When the taskFuture in {@link PhysicalVertex} completed, The NonCompletableFuture's whenComplete method will be called
+ */
+ private final NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex;
+
+ public SubPlan(int pipelineIndex,
+ int totalPipelineNum,
+ long initializationTimestamp,
+ @NonNull List<PhysicalVertex> physicalVertexList,
+ @NonNull List<PhysicalVertex> coordinatorVertexList,
+ @NonNull CompletableFuture<PipelineState> pipelineFuture,
+ @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex) {
+ this.pipelineIndex = pipelineIndex;
+ this.pipelineFuture = pipelineFuture;
+ this.totalPipelineNum = totalPipelineNum;
+ this.physicalVertexList = physicalVertexList;
+ this.coordinatorVertexList = coordinatorVertexList;
+ this.waitForCompleteByPhysicalVertex = waitForCompleteByPhysicalVertex;
+ stateTimestamps = new long[PipelineState.values().length];
+ this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
+ this.pipelineState.set(PipelineState.CREATED);
+ this.stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+ this.pipelineNameWithIndex = String.format(
+ "pipeline: [(%d/%d)]",
+ pipelineIndex + 1,
+ totalPipelineNum);
+
+ Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
+ x.whenComplete((v, t) -> {
+ if (ExecutionState.CANCELED.equals(v)) {
+ canceledTaskNum.incrementAndGet();
+ } else if (ExecutionState.FAILED.equals(v)) {
+ failedTaskNum.incrementAndGet();
+ } else {
+ throw new JobException("Unknown Task end state [" + v + "]");
+ }
+
+ if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
+ if (failedTaskNum.get() > 0) {
+ LOGGER.info(String.format("Pipeline failed %s", this.pipelineNameWithIndex));
+ pipelineState.set(PipelineState.FAILED);
+ } else if (canceledTaskNum.get() > 0) {
+ LOGGER.info(String.format("Pipeline canceled %s", this.pipelineNameWithIndex));
+ pipelineState.set(PipelineState.CANCELED);
+ } else {
+ LOGGER.info(String.format("Pipeline finished %s", this.pipelineNameWithIndex));
+ pipelineState.set(PipelineState.FINISHED);
+ }
+ pipelineFuture.complete(pipelineState.get());
+ }
+ });
+ });
+ }
+
+ public int getPipelineIndex() {
+ return pipelineIndex;
+ }
+
+ public List<PhysicalVertex> getPhysicalVertexList() {
+ return physicalVertexList;
+ }
+
+ public List<PhysicalVertex> getCoordinatorVertexList() {
+ return coordinatorVertexList;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
new file mode 100644
index 000000000..b2db98f18
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+/**
+ * An enumeration of all states that a task can be in during its execution. Tasks usually start in
+ * the state {@code CREATED} and switch states according to this diagram:
+ *
+ * <pre>{@code
+ * CREATED -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
+ * | | | | |
+ * | | | +-----+--------------+
+ * | | V V
+ * | | CANCELLING -----+----> CANCELED
+ * | | |
+ * | +-------------------------+
+ * |
+ * | ... -> FAILED
+ * V
+ * RECONCILING -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
+ *
+ * }</pre>
+ *
+ * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED} state if job
+ * manager fail over, and the {@code RECONCILING} state can switch into any existing task state.
+ *
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
+ *
+ * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are considered terminal
+ * states.
+ */
+public enum ExecutionState {
+ CREATED,
+
+ SCHEDULED,
+
+ DEPLOYING,
+
+ RUNNING,
+
+ /**
+ * This state marks "successfully completed". It can only be reached when a program reaches the
+ * "end of its input". The "end of input" can be reached when consuming a bounded input (fix set
+ * of files, bounded query, etc) or when stopping a program (not cancelling!) which make the
+ * input look like it reached its end at a specific point.
+ */
+ FINISHED,
+
+ CANCELING,
+
+ CANCELED,
+
+ FAILED,
+
+ RECONCILING,
+
+ /** Restoring last possible valid state of the task if it has it. */
+ INITIALIZING;
+
+ public boolean isEnd() {
+ return this == FINISHED || this == CANCELED || this == FAILED;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
similarity index 58%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index 2763942ee..546db419e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -17,19 +17,29 @@
package org.apache.seatunnel.engine.server.execution;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+public class TaskExecutionState {
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
+ private final long taskExecutionId;
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+ private final ExecutionState executionState;
- public TaskGroup(Task... tasks) {
- this.tasks = Arrays.asList(tasks);
+ private Throwable throwable;
+
+ public TaskExecutionState(long taskExecutionId, ExecutionState executionState, Throwable throwable) {
+ this.taskExecutionId = taskExecutionId;
+ this.executionState = executionState;
+ this.throwable = throwable;
+ }
+
+ public ExecutionState getExecutionState() {
+ return executionState;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ public long getTaskExecutionId() {
+ return taskExecutionId;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index 2763942ee..c0123277c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -17,19 +17,14 @@
package org.apache.seatunnel.engine.server.execution;
-import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Collection;
@Data
-@AllArgsConstructor
public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+ private final String taskGroupName;
- public TaskGroup(Task... tasks) {
- this.tasks = Arrays.asList(tasks);
- }
+ private final Collection<Task> tasks;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 37542cd2d..6c9d89517 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,53 +17,67 @@
package org.apache.seatunnel.engine.server.master;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.operationservice.OperationService;
import lombok.NonNull;
-import java.io.IOException;
+import java.util.concurrent.ExecutorService;
-public class JobMaster implements Task {
+public class JobMaster implements Runnable {
+ private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
- private final LogicalDag logicalDag;
+ private LogicalDag logicalDag;
private PhysicalPlan physicalPlan;
+ private final Data jobImmutableInformation;
- private NodeEngine nodeEngine;
+ private final NodeEngine nodeEngine;
- public JobMaster() {
- this.logicalDag = new LogicalDag();
- }
+ private final ExecutorService executorService;
- @Override
- public void init() throws Exception {
- physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag, nodeEngine);
- }
+ private FlakeIdGenerator flakeIdGenerator;
- @NonNull
- @Override
- public ProgressState call() {
- return ProgressState.DONE;
+ public JobMaster(@NonNull Data jobImmutableInformation,
+ @NonNull NodeEngine nodeEngine,
+ @NonNull ExecutorService executorService) {
+ this.jobImmutableInformation = jobImmutableInformation;
+ this.nodeEngine = nodeEngine;
+ this.executorService = executorService;
+ flakeIdGenerator =
+ this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
}
- @NonNull
- @Override
- public Long getTaskID() {
- return null;
- }
+ public void init() throws Exception {
+ JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
+ LOGGER.info("Job [" + jobInformation.getJobId() + "] submit");
+ LOGGER.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
- @Override
- public void close() throws IOException {
- Task.super.close();
+ // TODO Use classloader load the connector jars and deserialize logicalDag
+ this.logicalDag = new LogicalDag();
+ physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
+ nodeEngine,
+ jobInformation,
+ System.currentTimeMillis(),
+ executorService,
+ flakeIdGenerator);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
@Override
- public void setOperationService(OperationService operationService) {
- Task.super.setOperationService(operationService);
+ public void run() {
+ try {
+ LOGGER.info("I will sleep 2000ms");
+ Thread.sleep(2000);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index b72c62287..8fceed6c9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -15,19 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.engine.server.scheduler;
-import java.util.List;
+public interface JobScheduler {
+ void startScheduling();
-public class ExecutionPlan {
-
- private final List<Pipeline> pipelines;
-
- ExecutionPlan(List<Pipeline> pipelines) {
- this.pipelines = pipelines;
- }
-
- public List<Pipeline> getPipelines() {
- return pipelines;
- }
+ boolean updateExecutionState();
}