You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/29 05:22:11 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add LogicalDag Generator (#2284)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new d9d01a3df [ST-Engine] Add LogicalDag Generator (#2284)
d9d01a3df is described below
commit d9d01a3df907fc88294700dcf87766d712f57912
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Jul 29 13:22:07 2022 +0800
[ST-Engine] Add LogicalDag Generator (#2284)
* Add logicaldag generator
* merge from upstream
* fix UT error
---
.../api/transform/PartitionSeaTunnelTransform.java | 4 +-
.../api/transform/SeaTunnelTransform.java | 10 +-
.../common/constants/CollectionConstants.java | 16 ++
seatunnel-engine/seatunnel-engine-client/pom.xml | 17 ++
.../engine/client/ConnectorInstanceLoader.java | 78 ++++++
.../seatunnel/engine/client/JobConfigParser.java | 274 +++++++++++++++++++++
.../engine/client/JobExecutionEnvironment.java | 44 ++--
.../seatunnel/engine/client/SeaTunnelClient.java | 11 +-
.../engine/client/SeaTunnelClientInstance.java | 4 +-
.../engine/client/JobConfigParserTest.java | 70 ++++++
.../engine/client/LogicalDagGeneratorTest.java | 52 ++++
.../apache/seatunnel/engine/client/TestUtils.java | 9 +-
.../src/test/resources/fakesource_to_file.conf | 65 +++++
.../test/resources/fakesource_to_file_complex.conf | 71 ++++++
seatunnel-engine/seatunnel-engine-common/pom.xml | 5 +
.../seatunnel/engine/common/config/JobConfig.java | 35 ++-
.../JobDefineCheckExceptionSeaTunnel.java | 12 +-
.../serializeable/ConfigDataSerializerHook.java} | 20 +-
.../serializeable/SeaTunnelFactoryIdConstant.java} | 23 +-
.../engine/common/utils/IdGenerator.java} | 20 +-
.../services/com.hazelcast.DataSerializerHook | 18 ++
.../engine/core/dag/actions/AbstractAction.java | 80 ++++++
.../seatunnel/engine/core/dag/actions/Action.java} | 26 +-
.../core/dag/actions/PartitionTransformAction.java | 43 ++++
.../engine/core/dag/actions/SinkAction.java | 44 ++++
.../engine/core/dag/actions/SourceAction.java} | 24 +-
.../engine/core/dag/actions/TransformAction.java} | 30 ++-
.../engine/core/dag/logicaldag/LogicalDag.java | 158 ++++++++++++
.../core/dag/logicaldag/LogicalDagGenerator.java | 94 +++++++
.../engine/core/dag/logicaldag/LogicalEdge.java | 81 ++++++
.../engine/core/dag/logicaldag/LogicalVertex.java | 64 +++++
.../core/serializable/JobDataSerializerHook.java} | 44 +++-
.../services/com.hazelcast.DataSerializerHook | 18 ++
.../serializable/OperationDataSerializerHook.java | 14 +-
.../SeaTunnelTransformPluginDiscovery.java | 22 +-
35 files changed, 1487 insertions(+), 113 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/PartitionSeaTunnelTransform.java
similarity index 87%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/PartitionSeaTunnelTransform.java
index d6cd39dc6..cccfbd716 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/PartitionSeaTunnelTransform.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.common.config;
+package org.apache.seatunnel.api.transform;
-public class JobConfig {
+public interface PartitionSeaTunnelTransform extends SeaTunnelTransform {
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
similarity index 66%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index 28f5dc4d7..4b3eb065f 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -15,9 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.api.transform;
-public class CollectionConstants {
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
+import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
+import org.apache.seatunnel.api.source.SeaTunnelContextAware;
- public static final int MAP_SIZE = 6;
+import java.io.Serializable;
+
+public interface SeaTunnelTransform extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
index 28f5dc4d7..fb387b605 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
@@ -20,4 +20,20 @@ package org.apache.seatunnel.common.constants;
public class CollectionConstants {
public static final int MAP_SIZE = 6;
+
+ public static final String PLUGIN_NAME = "plugin_name";
+
+ public static final String SEATUNNEL_PLUGIN = "seatunnel";
+
+ public static final String FLINK_PLUGIN = "flink";
+
+ public static final String SPARK_PLUGIN = "spark";
+
+ public static final String SOURCE_PLUGIN = "source";
+
+ public static final String TRANSFORM_PLUGIN = "transform";
+
+ public static final String SINK_PLUGIN = "sink";
+
+ public static final String PARALLELISM = "parallelism";
}
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 26c502f36..c82c6b56d 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -39,6 +39,11 @@
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-core-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-core</artifactId>
@@ -50,6 +55,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-local</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
new file mode 100644
index 000000000..7791cbec4
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import scala.Serializable;
+
+public class ConnectorInstanceLoader {
+ public static SeaTunnelSource loadSourceInstance(Config sourceConfig) {
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ CollectionConstants.SEATUNNEL_PLUGIN,
+ CollectionConstants.SOURCE_PLUGIN,
+ sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
+
+ SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+ seaTunnelSource.prepare(sourceConfig);
+ seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+ if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+ && seaTunnelSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+ throw new UnsupportedOperationException(
+ String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
+ }
+ return seaTunnelSource;
+ }
+
+ public static SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> loadSinkInstance(
+ Config sinkConfig) {
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ CollectionConstants.SEATUNNEL_PLUGIN,
+ CollectionConstants.SINK_PLUGIN,
+ sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+ sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ seaTunnelSink.prepare(sinkConfig);
+ seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ return seaTunnelSink;
+ }
+
+ public static SeaTunnelTransform loadTransformInstance(Config transformConfig) {
+ SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ CollectionConstants.SEATUNNEL_PLUGIN,
+ CollectionConstants.TRANSFORM_PLUGIN,
+ transformConfig.getString(CollectionConstants.PLUGIN_NAME));
+ SeaTunnelTransform seaTunnelTransform = transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+ return seaTunnelTransform;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
new file mode 100644
index 000000000..7ddedd792
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.Data;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Serializable;
+
+@Data
+public class JobConfigParser {
+ private static final ILogger LOGGER = Logger.getLogger(JobConfigParser.class);
+ private String jobDefineFilePath;
+ private IdGenerator idGenerator;
+
+ private Map<Action, String> alreadyTransformActionMap = new HashMap<>();
+
+ private Map<String, List<Config>> transformResultTableNameMap = new HashMap<>();
+ private Map<String, List<Config>> transformSourceTableNameMap = new HashMap<>();
+
+ private Map<String, List<Config>> sourceResultTableNameMap = new HashMap<>();
+
+ protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator) {
+ this.jobDefineFilePath = jobDefineFilePath;
+ this.idGenerator = idGenerator;
+ }
+
+ public List<Action> parse() {
+ Config seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
+ List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
+ List<? extends Config> transformConfigs = seaTunnelJobConfig.getConfigList("transform");
+ List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
+
+ if (sinkConfigs.size() == 1 && sourceConfigs.size() == 1 & transformConfigs.size() <= 1) {
+ return sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+ } else {
+ return complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+ }
+ }
+
+ /**
+ * If there are multiple sources or multiple transforms or multiple sink, We will rely on
+ * source_table_name and result_table_name to build actions pipeline.
+ * So in this case result_table_name is necessary for the Source Connector and all of
+ * result_table_name and source_table_name are necessary for Transform Connector.
+ * By the end, source_table_name is necessary for Sink Connector.
+ */
+ private List<Action> complexAnalyze(List<? extends Config> sourceConfigs,
+ List<? extends Config> transformConfigs,
+ List<? extends Config> sinkConfigs) {
+ initRelationMap(sourceConfigs, transformConfigs);
+
+ List<Action> actions = new ArrayList<>();
+ for (Config config : sinkConfigs) {
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+ ConnectorInstanceLoader.loadSinkInstance(config);
+
+ SinkAction sinkAction =
+ new SinkAction(idGenerator.getNextId(), seaTunnelSink.getPluginName(), seaTunnelSink);
+ actions.add(sinkAction);
+ if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
+ throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+ + " must be set in the sink plugin config when the job have complex dependencies");
+ }
+ String sourceTableName = config.getString(Plugin.SOURCE_TABLE_NAME);
+ List<Config> transformConfigList = transformResultTableNameMap.get(sourceTableName);
+ if (CollectionUtils.isEmpty(transformConfigList)) {
+ sourceAnalyze(sourceTableName, sinkAction);
+ } else if (transformConfigList.size() > 1) {
+ throw new JobDefineCheckExceptionSeaTunnel("Only UnionTransform can have more than one upstream, "
+ + sinkAction.getName()
+ + " is not UnionTransform Connector");
+ } else {
+ transformAnalyze(sourceTableName, sinkAction);
+ }
+
+ }
+ return actions;
+ }
+
+ private void sourceAnalyze(String sourceTableName, Action action) {
+ List<Config> sourceConfigList = sourceResultTableNameMap.get(sourceTableName);
+ if (CollectionUtils.isEmpty(sourceConfigList)) {
+ throw new JobDefineCheckExceptionSeaTunnel(action.getName()
+ + " source table name [" + sourceTableName + "] can not be found");
+ }
+
+ // If a transform have more than one upstream action, the parallelism of this transform is the sum of the parallelism
+ // of its upstream action.
+ AtomicInteger totalParallelism = new AtomicInteger();
+ sourceConfigList.stream().forEach(sourceConfig -> {
+ SourceAction sourceAction =
+ new SourceAction(idGenerator.getNextId(),
+ sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfig));
+ int sourceParallelism = getSourceParallelism(sourceConfig);
+ sourceAction.setParallelism(sourceParallelism);
+ totalParallelism.set(totalParallelism.get() + sourceParallelism);
+ action.addUpstream(sourceAction);
+ action.setParallelism(totalParallelism.get());
+ });
+ }
+
+ private void transformAnalyze(String sourceTableName, Action action) {
+ // find upstream transform node
+ List<Config> transformConfigList = transformResultTableNameMap.get(sourceTableName);
+ if (CollectionUtils.isEmpty(transformConfigList)) {
+ sourceAnalyze(sourceTableName, action);
+ } else {
+ AtomicInteger totalParallelism = new AtomicInteger();
+ transformConfigList.stream().forEach(config -> {
+ SeaTunnelTransform seaTunnelTransform = ConnectorInstanceLoader.loadTransformInstance(config);
+ TransformAction transformAction =
+ new TransformAction(idGenerator.getNextId(), seaTunnelTransform.getPluginName(),
+ seaTunnelTransform);
+ action.addUpstream(transformAction);
+ transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME), transformAction);
+ totalParallelism.set(totalParallelism.get() + transformAction.getParallelism());
+ action.setParallelism(totalParallelism.get());
+ });
+ }
+ }
+
+ private void initRelationMap(List<? extends Config> sourceConfigs, List<? extends Config> transformConfigs) {
+ for (Config config : sourceConfigs) {
+ if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
+ throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+ + " must be set in the source plugin config when the job have complex dependencies");
+ }
+ String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
+ if (sourceResultTableNameMap.get(resultTableName) == null) {
+ sourceResultTableNameMap.put(resultTableName, new ArrayList<>());
+ }
+ sourceResultTableNameMap.get(resultTableName).add(config);
+ }
+
+ for (Config config : transformConfigs) {
+ if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
+ throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+ + " must be set in the transform plugin config when the job have complex dependencies");
+ }
+
+ if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
+ throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+ + " must be set in the transform plugin config when the job have complex dependencies");
+ }
+ String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
+ String sourceTableName = config.getString(Plugin.SOURCE_TABLE_NAME);
+
+ if (transformResultTableNameMap.get(resultTableName) == null) {
+ transformResultTableNameMap.put(resultTableName, new ArrayList<>());
+ }
+ transformResultTableNameMap.get(resultTableName).add(config);
+
+ if (transformSourceTableNameMap.get(sourceTableName) == null) {
+ transformSourceTableNameMap.put(sourceTableName, new ArrayList<>());
+ }
+ transformSourceTableNameMap.get(sourceTableName).add(config);
+
+ }
+ }
+
+ /**
+ * If there is only one Source and one Sink and at most one Transform, We simply build actions pipeline in the following order
+ * Source
+ * |
+ * Transform(If have)
+ * |
+ * Sink
+ */
+ private List<Action> sampleAnalyze(List<? extends Config> sourceConfigs,
+ List<? extends Config> transformConfigs,
+ List<? extends Config> sinkConfigs) {
+ SeaTunnelSource seaTunnelSource = ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
+ SourceAction sourceAction =
+ new SourceAction(idGenerator.getNextId(), seaTunnelSource.getPluginName(), seaTunnelSource);
+ sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
+ if (transformConfigs.size() != 0) {
+ SeaTunnelTransform seaTunnelTransform =
+ ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
+ TransformAction transformAction = new TransformAction(
+ idGenerator.getNextId(),
+ seaTunnelTransform.getPluginName(),
+ Lists.newArrayList(sourceAction),
+ seaTunnelTransform);
+
+ initTransformParallelism(transformConfigs, sourceAction, seaTunnelTransform, transformAction);
+
+ SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+ SinkAction sinkAction = new SinkAction(
+ idGenerator.getNextId(),
+ seaTunnelSink.getPluginName(),
+ Lists.newArrayList(transformAction),
+ seaTunnelSink
+ );
+ sinkAction.setParallelism(transformAction.getParallelism());
+ return Lists.newArrayList(sinkAction);
+ } else {
+ SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+ SinkAction sinkAction = new SinkAction(
+ idGenerator.getNextId(),
+ seaTunnelSink.getPluginName(),
+ Lists.newArrayList(sourceAction),
+ seaTunnelSink
+ );
+ sinkAction.setParallelism(sourceAction.getParallelism());
+ return Lists.newArrayList(sinkAction);
+ }
+ }
+
+ private void initTransformParallelism(List<? extends Config> transformConfigs, Action upstreamAction,
+ SeaTunnelTransform seaTunnelTransform, TransformAction transformAction) {
+ if ((seaTunnelTransform instanceof PartitionSeaTunnelTransform)
+ && transformConfigs.get(0).hasPath(CollectionConstants.PARALLELISM)) {
+ transformAction.setParallelism(transformConfigs
+ .get(0)
+ .getInt(CollectionConstants.PARALLELISM));
+ } else {
+ // If transform type is not RePartitionTransform, Using the parallelism of its upstream operators.
+ transformAction.setParallelism(upstreamAction.getParallelism());
+ }
+ }
+
+ private int getSourceParallelism(Config sourceConfig) {
+ if (sourceConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ int sourceParallelism = sourceConfig.getInt(CollectionConstants.PARALLELISM);
+ return sourceParallelism < 1 ? 1 : sourceParallelism;
+ }
+ return 1;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index 9cc0bfb61..b00bbcc27 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -18,48 +18,48 @@
package org.apache.seatunnel.engine.client;
-import org.apache.seatunnel.api.transform.Transformation;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
import java.util.ArrayList;
import java.util.List;
public class JobExecutionEnvironment {
- private List<Transformation> transformations;
-
private static String DEFAULT_JOB_NAME = "test_st_job";
- private SeaTunnelClientConfig configuration;
-
- private String jobName;
+ private JobConfig jobConfig;
private int maxParallelism = 1;
- public JobExecutionEnvironment(SeaTunnelClientConfig configuration) {
- this.configuration = configuration;
- }
+ private List<Action> actions = new ArrayList<>();
+
+ private String jobFilePath;
+
+ private IdGenerator idGenerator;
- public void addTransformation(Transformation transformation) {
- if (transformations == null) {
- transformations = new ArrayList<>();
- }
- this.transformations.add(transformation);
+ public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath) {
+ this.jobConfig = jobConfig;
+ this.jobFilePath = jobFilePath;
+ this.idGenerator = new IdGenerator();
}
- public List<Transformation> getTransformations() {
- return transformations;
+ public JobConfigParser getJobConfigParser() {
+ return new JobConfigParser(jobFilePath, idGenerator);
}
- public void setTransformations(List<Transformation> transformations) {
- this.transformations = transformations;
+ public void addAction(List<Action> actions) {
+ this.actions.addAll(actions);
}
- public void setJobName(String jobName) {
- this.jobName = jobName;
+ public LogicalDagGenerator getLogicalDagGenerator() {
+ return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}
- public void setMaxParallelism(int maxParallelism) {
- this.maxParallelism = maxParallelism;
+ public List<Action> getActions() {
+ return actions;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 652883a7d..4221ed873 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
@@ -40,7 +41,8 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
Preconditions.checkNotNull(seaTunnelClientConfig, "config");
- this.hazelcastClient = ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
+ this.hazelcastClient =
+ ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
this.serializationService = hazelcastClient.getSerializationService();
ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
}
@@ -52,9 +54,10 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
}
@Override
- public JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config) {
- // TODO analyze job config file and create LocalExecutionContext
- return null;
+ public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
+ JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
+ jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
+ return jobExecutionEnv;
}
public ILogger getLogger() {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 34ca898d9..1e0db46dd 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+
import com.hazelcast.core.HazelcastInstance;
import lombok.NonNull;
@@ -29,5 +31,5 @@ public interface SeaTunnelClientInstance {
@NonNull
HazelcastInstance getHazelcastInstance();
- JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+ JobExecutionEnvironment createExecutionContext(String filePath, JobConfig config);
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
new file mode 100644
index 000000000..f8fd9c873
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+
+@RunWith(JUnit4.class)
+public class JobConfigParserTest {
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Test
+ public void testSimpleJobParse() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/fakesource_to_file.conf");
+ JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
+ List<Action> actions = jobConfigParser.parse();
+ Assert.assertEquals(1, actions.size());
+
+ Assert.assertEquals("LocalFile", actions.get(0).getName());
+ Assert.assertEquals(1, actions.get(0).getUpstream().size());
+ Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
+
+ Assert.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism());
+ Assert.assertEquals(3, actions.get(0).getParallelism());
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Test
+ public void testComplexJobParse() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
+ List<Action> actions = jobConfigParser.parse();
+ Assert.assertEquals(1, actions.size());
+
+ Assert.assertEquals("LocalFile", actions.get(0).getName());
+ Assert.assertEquals(2, actions.get(0).getUpstream().size());
+ Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
+ Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(1).getName());
+
+ Assert.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism());
+ Assert.assertEquals(3, actions.get(0).getUpstream().get(1).getParallelism());
+ Assert.assertEquals(6, actions.get(0).getParallelism());
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
new file mode 100644
index 000000000..856c88aec
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+
+import com.hazelcast.internal.json.JsonObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class LogicalDagGeneratorTest {
+ @Test
+ public void testLogicalGenerator() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setBoundedness(Boundedness.BOUNDED);
+ jobConfig.setName("fake_to_file");
+ JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
+ jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
+
+ LogicalDagGenerator logicalDagGenerator = jobExecutionEnv.getLogicalDagGenerator();
+ LogicalDag logicalDag = logicalDagGenerator.generate();
+ JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
+ String result =
+ "{\"vertices\":[{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6}],\"edges\":[{\"leftVertex\":\"FakeSource\",\"rightVertex\":\"LocalFile\"},{\"leftVertex\":\"FakeSource\",\"rightVertex\":\"LocalFile\"}]}";
+ Assert.assertEquals(result, logicalDagJson.toString());
+ }
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
similarity index 79%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
index 28f5dc4d7..c392fdaf5 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.engine.client;
-public class CollectionConstants {
-
- public static final int MAP_SIZE = 6;
+public class TestUtils {
+ public static String getResource(String confFile) {
+ return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
new file mode 100644
index 000000000..c6f0f7bbf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ LocalFile {
+ path="file:///tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
new file mode 100644
index 000000000..dc73c2f09
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ LocalFile {
+ path="file:///tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error",
+ source_table_name="fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
index c62993d55..afad57006 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -42,5 +42,10 @@
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index d6cd39dc6..a33814677 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -17,5 +17,38 @@
package org.apache.seatunnel.engine.common.config;
-public class JobConfig {
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.Data;
+
+import java.io.IOException;
+
+@Data
+public class JobConfig implements IdentifiedDataSerializable {
+ private String name;
+ private Boundedness boundedness;
+
+ @Override
+ public int getFactoryId() {
+ return ConfigDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ConfigDataSerializerHook.JOB_CONFIG;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeString(name);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ this.name = in.readString();
+ }
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
similarity index 70%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
index 28f5dc4d7..2d0e10fe6 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
@@ -15,9 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.engine.common.exception;
-public class CollectionConstants {
+public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
- public static final int MAP_SIZE = 6;
+ public JobDefineCheckExceptionSeaTunnel(String message) {
+ super(message);
+ }
+
+ public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
similarity index 75%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
index 807602605..a5758a515 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
@@ -15,21 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.serializable;
-
-import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+package org.apache.seatunnel.engine.common.serializeable;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-public class OperationDataSerializerHook implements DataSerializerHook {
- public static final int PRINT_MESSAGE_OPERATOR = 0;
+public class ConfigDataSerializerHook implements DataSerializerHook {
+ /**
+ * Serialization ID of the {@link org.apache.seatunnel.engine.common.config.JobConfig} class.
+ */
+ public static final int JOB_CONFIG = 0;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
- OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
- OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+ SeaTunnelFactoryIdConstant.SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY,
+ SeaTunnelFactoryIdConstant.SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY_ID
);
@Override
@@ -47,8 +49,8 @@ public class OperationDataSerializerHook implements DataSerializerHook {
@Override
public IdentifiedDataSerializable create(int typeId) {
switch (typeId) {
- case PRINT_MESSAGE_OPERATOR:
- return new PrintMessageOperation();
+ case JOB_CONFIG:
+ return new JobConfig();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
index e062bf67d..9cfcc54f8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
@@ -15,15 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.serializable;
+package org.apache.seatunnel.engine.common.serializeable;
/**
* Constants used for Hazelcast's {@link com.hazelcast.nio.serialization.IdentifiedDataSerializable}
* mechanism.
*/
-public class OperationFactoryIdConstant {
- /** Name of the system property that specifies SeaTunnelEngine's data serialization factory ID. */
- public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY = "hazelcast.serialization.ds.seatunnel.engine.operation";
- /** Default ID of SeaTunnelEngine's data serialization factory. */
+public final class SeaTunnelFactoryIdConstant {
+ /**
+ * Name of the system property that specifies SeaTunnelEngine's data serialization factory ID.
+ */
+ public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY =
+ "hazelcast.serialization.ds.seatunnel.engine.operation";
+ /**
+ * Default ID of SeaTunnelEngine's data serialization factory.
+ */
public static final int SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID = -30001;
+
+ public static final String SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY =
+ "hazelcast.serialization.ds.seatunnel.engine.job";
+ public static final int SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID = -30002;
+
+ public static final String SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY =
+ "hazelcast.serialization.ds.seatunnel.engine.config";
+ public static final int SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY_ID = -30003;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
similarity index 57%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
index e062bf67d..e05cbfa4d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.serializable;
+package org.apache.seatunnel.engine.common.utils;
+
+import java.io.Serializable;
/**
- * Constants used for Hazelcast's {@link com.hazelcast.nio.serialization.IdentifiedDataSerializable}
- * mechanism.
+ * It is used to generate the ID of each vertex in DAG. We just need to ensure that the id of all Vertices in a DAG are
+ * unique.
*/
-public class OperationFactoryIdConstant {
- /** Name of the system property that specifies SeaTunnelEngine's data serialization factory ID. */
- public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY = "hazelcast.serialization.ds.seatunnel.engine.operation";
- /** Default ID of SeaTunnelEngine's data serialization factory. */
- public static final int SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID = -30001;
+public class IdGenerator implements Serializable {
+ private int id = 0;
+
+ public int getNextId() {
+ id++;
+ return id;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-common/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
new file mode 100644
index 000000000..db10e7aa3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
new file mode 100644
index 000000000..d85b2873f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractAction implements Action {
+ private String name;
+ private List<Action> upstreams = new ArrayList<>();
+ // This is used to assign a unique ID to every Action
+ private int id;
+
+ private int parallelism = 1;
+
+ protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams) {
+ this.id = id;
+ this.name = name;
+ this.upstreams = upstreams;
+ }
+
+ protected AbstractAction(int id, @NonNull String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @NonNull
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void setName(@NonNull String name) {
+ this.name = name;
+ }
+
+ @NonNull
+ @Override
+ public List<Action> getUpstream() {
+ return upstreams;
+ }
+
+ @Override
+ public void addUpstream(@NonNull Action action) {
+ this.upstreams.add(action);
+ }
+
+ @Override
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public int getId() {
+ return id;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
similarity index 65%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index 34ca898d9..86e39c62d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -15,19 +15,27 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.core.dag.actions;
-import com.hazelcast.core.HazelcastInstance;
import lombok.NonNull;
-public interface SeaTunnelClientInstance {
+import java.io.Serializable;
+import java.util.List;
- /**
- * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
- * be a client, depending on the type of this
- */
+public interface Action extends Serializable {
@NonNull
- HazelcastInstance getHazelcastInstance();
+ String getName();
- JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+ void setName(@NonNull String name);
+
+ @NonNull
+ List<Action> getUpstream();
+
+ void addUpstream(@NonNull Action action);
+
+ int getParallelism();
+
+ void setParallelism(int parallelism);
+
+ int getId();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
new file mode 100644
index 000000000..25377c4bd
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class PartitionTransformAction extends AbstractAction {
+ private PartitionSeaTunnelTransform partitionTransformation;
+
+ public PartitionTransformAction(int id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull PartitionSeaTunnelTransform partitionTransformation) {
+ super(id, name, upstreams);
+ this.partitionTransformation = partitionTransformation;
+ }
+
+ public PartitionTransformAction(int id,
+ @NonNull String name,
+ @NonNull PartitionSeaTunnelTransform partitionTransformation) {
+ super(id, name);
+ this.partitionTransformation = partitionTransformation;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
new file mode 100644
index 000000000..8dac82faf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+@SuppressWarnings("checkstyle:ClassTypeParameterName")
+public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends AbstractAction {
+ private SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
+
+ public SinkAction(int id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
+ super(id, name, upstreams);
+ this.sink = sink;
+ }
+
+ public SinkAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
+ super(id, name);
+ this.sink = sink;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
similarity index 56%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index 34ca898d9..5599b0e5d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -15,19 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.core.dag.actions;
-import com.hazelcast.core.HazelcastInstance;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import com.google.common.collect.Lists;
import lombok.NonNull;
-public interface SeaTunnelClientInstance {
+import java.io.Serializable;
- /**
- * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
- * be a client, depending on the type of this
- */
- @NonNull
- HazelcastInstance getHazelcastInstance();
+public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
+ private SeaTunnelSource<T, SplitT, StateT> source;
- JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+ public SourceAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelSource<T, SplitT, StateT> source) {
+ super(id, name, Lists.newArrayList());
+ this.source = source;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index 34ca898d9..afea5986d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -15,19 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import com.hazelcast.core.HazelcastInstance;
import lombok.NonNull;
-public interface SeaTunnelClientInstance {
+import java.util.List;
+
+public class TransformAction extends AbstractAction {
+ private SeaTunnelTransform transformation;
- /**
- * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
- * be a client, depending on the type of this
- */
- @NonNull
- HazelcastInstance getHazelcastInstance();
+ public TransformAction(int id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull SeaTunnelTransform transformation) {
+ super(id, name, upstreams);
+ this.transformation = transformation;
+ }
- JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+ public TransformAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelTransform transformation) {
+ super(id, name);
+ this.transformation = transformation;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
new file mode 100644
index 000000000..ed4f3f2fc
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.internal.json.JsonArray;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A LogicalDag describe the logical plan run by SeaTunnel Engine
+ * {@link LogicalVertex} defines an operator, and {@link LogicalEdge} defines the
+ * relationship between the two operators.
+ * <p>
+ * {@link LogicalVertex} not a final executable object. It will be optimized when
+ * generate PhysicalDag in JobMaster.
+ * <p>
+ * There are three basic kinds of vertices:
+ * <ol><li>
+ * <em>SeaTunnelSource</em> with just outbound edges;
+ * </li><li>
+ * <em>SeaTunnelTransform</em> with both inbound and outbound edges;
+ * </li><li>
+ * <em>SeaTunnelSink</em> with just inbound edges.
+ * </li></ol>
+ * Data travels from sources to sinks and is transformed and reshaped
+ * as it passes through the processors.
+ */
+public class LogicalDag implements IdentifiedDataSerializable {
+ private static final Logger LOG = LoggerFactory.getLogger(LogicalDag.class);
+ private JobConfig jobConfig;
+ private Set<LogicalEdge> edges = new LinkedHashSet<>();
+ private Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
+ private IdGenerator idGenerator;
+
+ public LogicalDag() {
+ }
+
+ public LogicalDag(@NonNull JobConfig jobConfig,
+ @NonNull IdGenerator idGenerator) {
+ this.jobConfig = jobConfig;
+ this.idGenerator = idGenerator;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return JobDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return JobDataSerializerHook.LOGICAL_DAG;
+ }
+
+ public void addLogicalVertex(LogicalVertex logicalVertex) {
+ logicalVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
+ }
+
+ public void addEdge(LogicalEdge logicalEdge) {
+ edges.add(logicalEdge);
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeInt(logicalVertexMap.size());
+
+ for (Map.Entry<Integer, LogicalVertex> entry : logicalVertexMap.entrySet()) {
+ out.writeInt(entry.getKey());
+ out.writeObject(entry.getValue());
+ }
+
+ out.writeInt(edges.size());
+
+ for (LogicalEdge edge : edges) {
+ out.writeObject(edge);
+ }
+
+ out.writeObject(jobConfig);
+ out.writeObject(idGenerator);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ int vertexCount = in.readInt();
+
+ for (int i = 0; i < vertexCount; i++) {
+ Integer key = in.readInt();
+ LogicalVertex value = in.readObject();
+ logicalVertexMap.put(key, value);
+ }
+
+ int edgeCount = in.readInt();
+
+ for (int i = 0; i < edgeCount; i++) {
+ LogicalEdge edge = in.readObject();
+ edge.recoveryFromVertexMap(logicalVertexMap);
+ edges.add(edge);
+ }
+
+ jobConfig = in.readObject();
+ idGenerator = in.readObject();
+ }
+
+ @NonNull
+ public JsonObject getLogicalDagAsJson() {
+ JsonObject logicalDag = new JsonObject();
+ JsonArray vertices = new JsonArray();
+
+ logicalVertexMap.values().stream().forEach(v -> {
+ JsonObject vertex = new JsonObject();
+ vertex.add("id", v.getVertexId());
+ vertex.add("name", v.getAction().getName() + "(id=" + v.getVertexId() + ")");
+ vertex.add("parallelism", v.getParallelism());
+ vertices.add(vertex);
+ });
+ logicalDag.add("vertices", vertices);
+
+ JsonArray edges = new JsonArray();
+ this.edges.stream().forEach(e -> {
+ JsonObject edge = new JsonObject();
+ edge.add("leftVertex", e.getLeftVertex().getAction().getName());
+ edge.add("rightVertex", e.getRightVertex().getAction().getName());
+ edges.add(edge);
+ });
+
+ logicalDag.add("edges", edges);
+ return logicalDag;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
new file mode 100644
index 000000000..a08c48301
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LogicalDagGenerator {
+ private static final ILogger LOGGER = Logger.getLogger(LogicalDagGenerator.class);
+ private List<Action> actions;
+ private LogicalDag logicalDag;
+ private JobConfig jobConfig;
+ private IdGenerator idGenerator;
+
+ private Map<Action, Collection<Integer>> alreadyTransformed = new HashMap<>();
+
+ private Map<Integer, LogicalVertex> logicalIdVertexMap = new HashMap<>();
+
+ public LogicalDagGenerator(@NonNull List<Action> actions,
+ @NonNull JobConfig jobConfig,
+ @NonNull IdGenerator idGenerator) {
+ this.actions = actions;
+ this.jobConfig = jobConfig;
+ this.idGenerator = idGenerator;
+ if (actions.size() <= 0) {
+ throw new IllegalStateException("No actions define in the job. Cannot execute.");
+ }
+ }
+
+ public LogicalDag generate() {
+ logicalDag = new LogicalDag(jobConfig, idGenerator);
+ for (Action action : actions) {
+ transformAction(action);
+ }
+ return logicalDag;
+ }
+
+ private Collection<Integer> transformAction(Action action) {
+ if (alreadyTransformed.containsKey(action)) {
+ return alreadyTransformed.get(action);
+ }
+
+ Collection<Integer> upstreamVertexIds = new ArrayList<>();
+ List<Action> upstream = action.getUpstream();
+ if (!CollectionUtils.isEmpty(upstream)) {
+ for (Action upstreamAction : upstream) {
+ upstreamVertexIds.addAll(transformAction(upstreamAction));
+ }
+ }
+
+ LogicalVertex logicalVertex =
+ new LogicalVertex(action.getId(), action, action.getParallelism());
+ logicalDag.addLogicalVertex(logicalVertex);
+ Collection<Integer> transformedActions = Lists.newArrayList(logicalVertex.getVertexId());
+ alreadyTransformed.put(action, transformedActions);
+ logicalIdVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
+
+ if (!CollectionUtils.isEmpty(upstreamVertexIds)) {
+ upstreamVertexIds.stream().forEach(id -> {
+ LogicalEdge logicalEdge = new LogicalEdge(logicalIdVertexMap.get(id), logicalVertex);
+ logicalDag.addEdge(logicalEdge);
+ });
+ }
+ return transformedActions;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
new file mode 100644
index 000000000..ac48b4e37
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.Data;
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.util.Map;
+
+@Data
+public class LogicalEdge implements IdentifiedDataSerializable {
+ private LogicalVertex leftVertex;
+ private LogicalVertex rightVertex;
+
+ private Integer leftVertexId;
+
+ private Integer rightVertexId;
+
+ public LogicalEdge(){}
+
+ public LogicalEdge(LogicalVertex leftVertex, LogicalVertex rightVertex) {
+ this.leftVertex = leftVertex;
+ this.rightVertex = rightVertex;
+ this.leftVertexId = leftVertex.getVertexId();
+ this.rightVertexId = rightVertex.getVertexId();
+ }
+
+ public void recoveryFromVertexMap(@NonNull Map<Integer, LogicalVertex> vertexMap) {
+ leftVertex = vertexMap.get(leftVertexId);
+ rightVertex = vertexMap.get(rightVertexId);
+
+ checkNotNull(leftVertex);
+ checkNotNull(rightVertex);
+ }
+
+ @Override
+ public int getFactoryId() {
+ return JobDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return JobDataSerializerHook.LOGICAL_EDGE;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ // To prevent circular serialization, we only serialize the ID of vertices for edges
+ out.writeInt(leftVertexId);
+ out.writeInt(rightVertexId);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ leftVertexId = in.readInt();
+ rightVertexId = in.readInt();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
new file mode 100644
index 000000000..09a61c116
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.IOException;
+
+@Data
+@AllArgsConstructor
+public class LogicalVertex implements IdentifiedDataSerializable {
+ private Integer vertexId;
+ private Action action;
+ private int parallelism;
+
+ public LogicalVertex() {
+ }
+
+ @Override
+ public int getFactoryId() {
+ return JobDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return JobDataSerializerHook.LOGICAL_VERTEX;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeInt(vertexId);
+ out.writeObject(action);
+ out.writeInt(parallelism);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ vertexId = in.readInt();
+ action = in.readObject();
+ parallelism = in.readInt();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
similarity index 52%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index 807602605..2a98be5f0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -15,21 +15,45 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.serializable;
+package org.apache.seatunnel.engine.core.serializable;
-import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.annotation.PrivateApi;
-public class OperationDataSerializerHook implements DataSerializerHook {
- public static final int PRINT_MESSAGE_OPERATOR = 0;
+/**
+ * A Java Service Provider hook for Hazelcast's Identified Data Serializable
+ * mechanism. This is private API.
+ * All about the Job's data serializable define in this class.
+ */
+@PrivateApi
+public final class JobDataSerializerHook implements DataSerializerHook {
+
+ /**
+ * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag} class.
+ */
+ public static final int LOGICAL_DAG = 0;
+
+ /**
+ * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex} class.
+ */
+ public static final int LOGICAL_VERTEX = 1;
+
+ /**
+ * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge} class.
+ */
+ public static final int LOGICAL_EDGE = 2;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
- OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
- OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+ SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY,
+ SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID
);
@Override
@@ -47,8 +71,12 @@ public class OperationDataSerializerHook implements DataSerializerHook {
@Override
public IdentifiedDataSerializable create(int typeId) {
switch (typeId) {
- case PRINT_MESSAGE_OPERATOR:
- return new PrintMessageOperation();
+ case LOGICAL_DAG:
+ return new LogicalDag();
+ case LOGICAL_VERTEX:
+ return new LogicalVertex();
+ case LOGICAL_EDGE:
+ return new LogicalEdge();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-core/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
new file mode 100644
index 000000000..900587e31
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 807602605..f56a68353 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -17,19 +17,27 @@
package org.apache.seatunnel.engine.server.serializable;
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.annotation.PrivateApi;
-public class OperationDataSerializerHook implements DataSerializerHook {
+/**
+ * A Java Service Provider hook for Hazelcast's Identified Data Serializable
+ * mechanism. This is private API.
+ * All about the Operation's data serializable define in this class.
+ */
+@PrivateApi
+public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int PRINT_MESSAGE_OPERATOR = 0;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
- OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
- OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+ SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
+ SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
);
@Override
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
index 34ca898d9..395e857ed 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.NonNull;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-public interface SeaTunnelClientInstance {
+public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> {
- /**
- * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
- * be a client, depending on the type of this
- */
- @NonNull
- HazelcastInstance getHazelcastInstance();
+ public SeaTunnelTransformPluginDiscovery() {
+ super("seatunnel");
+ }
- JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+ @Override
+ protected Class<SeaTunnelTransform> getPluginBaseClass() {
+ return SeaTunnelTransform.class;
+ }
}