You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/18 02:29:06 UTC
[incubator-seatunnel] branch api-draft updated: Add plugin processor for SeaTunnel API (#1902)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 273121d5 Add plugin processor for SeaTunnel API (#1902)
273121d5 is described below
commit 273121d5a1f066a9e0d70e529f4ee5c0c6cf93b9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 18 10:29:02 2022 +0800
Add plugin processor for SeaTunnel API (#1902)
---
.../command/SeaTunnelApiTaskExecuteCommand.java | 1 -
.../execution/AbstractPluginExecuteProcessor.java | 74 ++++++++++
.../flink/execution/PluginExecuteProcessor.java | 34 +++++
.../flink/execution/SeaTunnelTaskExecution.java | 154 +++------------------
.../core/flink/execution/SinkExecuteProcessor.java | 79 +++++++++++
.../flink/execution/SourceExecuteProcessor.java | 73 ++++++++++
.../flink/execution/TransformExecuteProcessor.java | 80 +++++++++++
7 files changed, 356 insertions(+), 139 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index f6cc4f8a..faeff4ae 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -53,7 +53,6 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
SeaTunnelTaskExecution seaTunnelTaskExecution = new SeaTunnelTaskExecution(config);
try {
seaTunnelTaskExecution.execute();
- LOGGER.info("Flink Execution Plan:{}", seaTunnelTaskExecution.getExecutionPlan());
} catch (Exception e) {
throw new CommandExecuteException("Flink job executed failed", e);
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
new file mode 100644
index 00000000..ef5a05b3
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.util.TableUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Optional;
+
+public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecuteProcessor {
+
+ protected final FlinkEnvironment flinkEnvironment;
+ protected final List<? extends Config> pluginConfigs;
+ protected final List<T> plugins;
+
+ protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ List<? extends Config> pluginConfigs) {
+ this.flinkEnvironment = flinkEnvironment;
+ this.pluginConfigs = pluginConfigs;
+ this.plugins = initializePlugins(pluginConfigs);
+ }
+
+ protected abstract List<T> initializePlugins(List<? extends Config> pluginConfigs);
+
+ protected void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
+ if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
+ String name = pluginConfig.getString(RESULT_TABLE_NAME);
+ StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+ if (!TableUtil.tableExists(tableEnvironment, name)) {
+ if (pluginConfig.hasPath("field_name")) {
+ String fieldName = pluginConfig.getString("field_name");
+ tableEnvironment.registerDataStream(name, dataStream, fieldName);
+ } else {
+ tableEnvironment.registerDataStream(name, dataStream);
+ }
+ }
+ }
+ }
+
+ protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+ if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+ StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+ Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+ return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+ }
+ return Optional.empty();
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/PluginExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/PluginExecuteProcessor.java
new file mode 100644
index 00000000..c1f07084
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/PluginExecuteProcessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+public interface PluginExecuteProcessor {
+
+ /**
+ * Execute the current plugins, and return the result data stream.
+ *
+ * @param upstreamDataStreams the upstream data streams.
+ * @return the result data stream
+ */
+ List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception;
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
index 2277e8ac..14ddef73 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
@@ -17,58 +17,33 @@
package org.apache.seatunnel.core.flink.execution;
-import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
-import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.EnvironmentFactory;
import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
-import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.net.URL;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Used to execute a SeaTunnelTask.
*/
public class SeaTunnelTaskExecution {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelTaskExecution.class);
+
private final Config config;
private final FlinkEnvironment flinkEnvironment;
-
- // todo: initialize in sub class
- private final List<SeaTunnelParallelSource> sources;
- private final List<? extends Config> sourceConfigs;
- private final List<FlinkStreamTransform> transforms;
- private final List<? extends Config> transformConfigs;
- private final List<Sink> sinks;
- private final List<? extends Config> sinkConfigs;
- private final List<URL> pluginJars = new ArrayList<>();
+ private final PluginExecuteProcessor sourcePluginExecuteProcessor;
+ private final PluginExecuteProcessor transformPluginExecuteProcessor;
+ private final PluginExecuteProcessor sinkPluginExecuteProcessor;
public SeaTunnelTaskExecution(Config config) {
this.config = config;
@@ -76,115 +51,18 @@ public class SeaTunnelTaskExecution {
this.flinkEnvironment = (FlinkEnvironment) new EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
this.flinkEnvironment.setJobMode(JobMode.STREAMING);
this.flinkEnvironment.prepare();
- this.sourceConfigs = config.getConfigList("source");
- this.transformConfigs = config.getConfigList("transform");
- this.sinkConfigs = config.getConfigList("sink");
-
- this.sources = initializeSources(sourceConfigs);
- this.transforms = initializeTransforms(transformConfigs);
- this.sinks = initializeSinks(sinkConfigs);
- flinkEnvironment.registerPlugin(pluginJars);
+ this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
+ this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
+ this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
}
public void execute() throws Exception {
- List<DataStream<Row>> data = new ArrayList<>();
- StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
- for (int i = 0; i < sources.size(); i++) {
- DataStreamSource<Row> sourceStream = executionEnvironment.addSource(sources.get(i));
- data.add(sourceStream);
- Config pluginConfig = sourceConfigs.get(i);
- registerResultTable(pluginConfig, sourceStream);
- }
-
- DataStream<Row> input = data.get(0);
-
- for (int i = 0; i < transforms.size(); i++) {
- FlinkStreamTransform transform = transforms.get(i);
- transform.setConfig(transformConfigs.get(i));
- transform.prepare(flinkEnvironment);
- DataStream<Row> stream = fromSourceTable(transformConfigs.get(i)).orElse(input);
- input = transform.processStream(flinkEnvironment, stream);
- Config pluginConfig = transformConfigs.get(i);
- registerResultTable(pluginConfig, input);
- transform.registerFunction(flinkEnvironment);
- }
-
- for (int i = 0; i < sinks.size(); i++) {
- Config sinkConfig = sinkConfigs.get(i);
- DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- stream.sinkTo(sinks.get(i));
- }
- executionEnvironment.execute();
- }
-
- private List<Sink> initializeSinks(List<? extends Config> sinkConfigs) {
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
- FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
- return sinkConfigs.stream()
- .map(sinkConfig -> {
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "sink",
- sinkConfig.getString("plugin_name"));
- pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<SeaTunnelRow, Object, Object, Object> pluginInstance = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
- return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
- }).collect(Collectors.toList());
- }
-
- private List<FlinkStreamTransform> initializeTransforms(List<? extends Config> transformConfigs) {
- SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
- return transformConfigs.stream()
- .map(transformConfig -> {
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "transform",
- transformConfig.getString("plugin_name"));
- pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- return (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
- }).collect(Collectors.toList());
- }
-
- private List<SeaTunnelParallelSource> initializeSources(List<? extends Config> sourceConfigs) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
- List<SeaTunnelParallelSource> seaTunnelSources = new ArrayList<>();
- for (int i = 0; i < sourceConfigs.size(); i++) {
- Config sourceConfig = sourceConfigs.get(i);
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "source",
- sourceConfig.getString("plugin_name"));
- pluginJars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- seaTunnelSources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
- }
- return seaTunnelSources;
- }
-
- private void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
- if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
- String name = pluginConfig.getString(RESULT_TABLE_NAME);
- StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
- if (!TableUtil.tableExists(tableEnvironment, name)) {
- if (pluginConfig.hasPath("field_name")) {
- String fieldName = pluginConfig.getString("field_name");
- tableEnvironment.registerDataStream(name, dataStream, fieldName);
- } else {
- tableEnvironment.registerDataStream(name, dataStream);
- }
- }
- }
- }
-
- private Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
- if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
- StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
- Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
- return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
- }
- return Optional.empty();
- }
+ List<DataStream<Row>> dataStreams = new ArrayList<>();
+ dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
+ dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
+ sinkPluginExecuteProcessor.execute(dataStreams);
- public String getExecutionPlan() {
- return flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan();
+ LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+ flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
new file mode 100644
index 00000000..85726023
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Serializable;
+
+public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Row, Serializable, Serializable, Serializable>> {
+
+ protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ List<? extends Config> pluginConfigs) {
+ super(flinkEnvironment, pluginConfigs);
+ }
+
+ @Override
+ protected List<Sink<Row, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+ List<URL> pluginJars = new ArrayList<>();
+ FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
+ List<Sink<Row, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ "seatunnel",
+ "sink",
+ sinkConfig.getString("plugin_name"));
+ pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> pluginInstance =
+ sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
+ }).collect(Collectors.toList());
+ flinkEnvironment.registerPlugin(pluginJars);
+ return sinks;
+ }
+
+ @Override
+ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception {
+ DataStream<Row> input = upstreamDataStreams.get(0);
+ for (int i = 0; i < plugins.size(); i++) {
+ Config sinkConfig = pluginConfigs.get(i);
+ DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
+ stream.sinkTo(plugins.get(i));
+ }
+ // the sink is the last stream
+ return null;
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
new file mode 100644
index 00000000..2e264af4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelParallelSource> {
+
+ public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ List<? extends Config> sourceConfigs) {
+ super(flinkEnvironment, sourceConfigs);
+ }
+
+ @Override
+ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
+ StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+ List<DataStream<Row>> sources = new ArrayList<>();
+ for (int i = 0; i < plugins.size(); i++) {
+ DataStreamSource<Row> sourceStream = executionEnvironment.addSource(plugins.get(i));
+ Config pluginConfig = pluginConfigs.get(i);
+ registerResultTable(pluginConfig, sourceStream);
+ sources.add(sourceStream);
+ }
+ return sources;
+ }
+
+ @Override
+ protected List<SeaTunnelParallelSource> initializePlugins(List<? extends Config> pluginConfigs) {
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+ List<SeaTunnelParallelSource> sources = new ArrayList<>();
+ List<URL> jars = new ArrayList<>();
+ for (Config sourceConfig : pluginConfigs) {
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ "seatunnel",
+ "source",
+ sourceConfig.getString("plugin_name"));
+ jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ sources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+ }
+ flinkEnvironment.registerPlugin(jars);
+ return sources;
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
new file mode 100644
index 00000000..508dec9a
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+ protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
+ List<? extends Config> pluginConfigs) {
+ super(flinkEnvironment, pluginConfigs);
+ }
+
+ @Override
+ protected List<FlinkStreamTransform> initializePlugins(List<? extends Config> pluginConfigs) {
+ SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
+ List<URL> pluginJars = new ArrayList<>();
+ List<FlinkStreamTransform> transforms = pluginConfigs.stream()
+ .map(transformConfig -> {
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ "seatunnel",
+ "transform",
+ transformConfig.getString("plugin_name"));
+ pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ pluginInstance.setConfig(transformConfig);
+ pluginInstance.prepare(flinkEnvironment);
+ return pluginInstance;
+ }).collect(Collectors.toList());
+ flinkEnvironment.registerPlugin(pluginJars);
+ return transforms;
+ }
+
+ @Override
+ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception {
+ if (plugins.isEmpty()) {
+ return upstreamDataStreams;
+ }
+ DataStream<Row> input = upstreamDataStreams.get(0);
+ List<DataStream<Row>> result = new ArrayList<>();
+ for (int i = 0; i < plugins.size(); i++) {
+ FlinkStreamTransform transform = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
+ DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
+ input = transform.processStream(flinkEnvironment, stream);
+ registerResultTable(pluginConfig, input);
+ transform.registerFunction(flinkEnvironment);
+ result.add(input);
+ }
+ return result;
+ }
+}