You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/18 02:29:06 UTC

[incubator-seatunnel] branch api-draft updated: Add plugin processor for SeaTunnel API (#1902)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 273121d5 Add plugin processor for SeaTunnel API (#1902)
273121d5 is described below

commit 273121d5a1f066a9e0d70e529f4ee5c0c6cf93b9
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 18 10:29:02 2022 +0800

    Add plugin processor for SeaTunnel API (#1902)
---
 .../command/SeaTunnelApiTaskExecuteCommand.java    |   1 -
 .../execution/AbstractPluginExecuteProcessor.java  |  74 ++++++++++
 .../flink/execution/PluginExecuteProcessor.java    |  34 +++++
 .../flink/execution/SeaTunnelTaskExecution.java    | 154 +++------------------
 .../core/flink/execution/SinkExecuteProcessor.java |  79 +++++++++++
 .../flink/execution/SourceExecuteProcessor.java    |  73 ++++++++++
 .../flink/execution/TransformExecuteProcessor.java |  80 +++++++++++
 7 files changed, 356 insertions(+), 139 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index f6cc4f8a..faeff4ae 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -53,7 +53,6 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
         SeaTunnelTaskExecution seaTunnelTaskExecution = new SeaTunnelTaskExecution(config);
         try {
             seaTunnelTaskExecution.execute();
-            LOGGER.info("Flink Execution Plan:{}", seaTunnelTaskExecution.getExecutionPlan());
         } catch (Exception e) {
             throw new CommandExecuteException("Flink job executed failed", e);
         }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
new file mode 100644
index 00000000..ef5a05b3
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.util.TableUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Optional;
+
+public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecuteProcessor {
+
+    protected final FlinkEnvironment flinkEnvironment;
+    protected final List<? extends Config> pluginConfigs;
+    protected final List<T> plugins;
+
+    protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                             List<? extends Config> pluginConfigs) {
+        this.flinkEnvironment = flinkEnvironment;
+        this.pluginConfigs = pluginConfigs;
+        this.plugins = initializePlugins(pluginConfigs);
+    }
+
+    protected abstract List<T> initializePlugins(List<? extends Config> pluginConfigs);
+
+    protected void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
+        if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
+            String name = pluginConfig.getString(RESULT_TABLE_NAME);
+            StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+            if (!TableUtil.tableExists(tableEnvironment, name)) {
+                if (pluginConfig.hasPath("field_name")) {
+                    String fieldName = pluginConfig.getString("field_name");
+                    tableEnvironment.registerDataStream(name, dataStream, fieldName);
+                } else {
+                    tableEnvironment.registerDataStream(name, dataStream);
+                }
+            }
+        }
+    }
+
+    protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+            StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+            Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+        }
+        return Optional.empty();
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/PluginExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/PluginExecuteProcessor.java
new file mode 100644
index 00000000..c1f07084
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/PluginExecuteProcessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+public interface PluginExecuteProcessor {
+
+    /**
+     * Execute the current plugins, and return the result data stream.
+     *
+     * @param upstreamDataStreams the upstream data streams.
+     * @return the result data stream
+     */
+    List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception;
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
index 2277e8ac..14ddef73 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
@@ -17,58 +17,33 @@
 
 package org.apache.seatunnel.core.flink.execution;
 
-import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
-import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.config.EnvironmentFactory;
 import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
-import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * Used to execute a SeaTunnelTask.
  */
 public class SeaTunnelTaskExecution {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelTaskExecution.class);
+
     private final Config config;
     private final FlinkEnvironment flinkEnvironment;
-
-    // todo: initialize in sub class
-    private final List<SeaTunnelParallelSource> sources;
-    private final List<? extends Config> sourceConfigs;
-    private final List<FlinkStreamTransform> transforms;
-    private final List<? extends Config> transformConfigs;
-    private final List<Sink> sinks;
-    private final List<? extends Config> sinkConfigs;
-    private final List<URL> pluginJars = new ArrayList<>();
+    private final PluginExecuteProcessor sourcePluginExecuteProcessor;
+    private final PluginExecuteProcessor transformPluginExecuteProcessor;
+    private final PluginExecuteProcessor sinkPluginExecuteProcessor;
 
     public SeaTunnelTaskExecution(Config config) {
         this.config = config;
@@ -76,115 +51,18 @@ public class SeaTunnelTaskExecution {
         this.flinkEnvironment = (FlinkEnvironment) new EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
         this.flinkEnvironment.setJobMode(JobMode.STREAMING);
         this.flinkEnvironment.prepare();
-        this.sourceConfigs = config.getConfigList("source");
-        this.transformConfigs = config.getConfigList("transform");
-        this.sinkConfigs = config.getConfigList("sink");
-
-        this.sources = initializeSources(sourceConfigs);
-        this.transforms = initializeTransforms(transformConfigs);
-        this.sinks = initializeSinks(sinkConfigs);
-        flinkEnvironment.registerPlugin(pluginJars);
+        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
+        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
+        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
     }
 
     public void execute() throws Exception {
-        List<DataStream<Row>> data = new ArrayList<>();
-        StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
-        for (int i = 0; i < sources.size(); i++) {
-            DataStreamSource<Row> sourceStream = executionEnvironment.addSource(sources.get(i));
-            data.add(sourceStream);
-            Config pluginConfig = sourceConfigs.get(i);
-            registerResultTable(pluginConfig, sourceStream);
-        }
-
-        DataStream<Row> input = data.get(0);
-
-        for (int i = 0; i < transforms.size(); i++) {
-            FlinkStreamTransform transform = transforms.get(i);
-            transform.setConfig(transformConfigs.get(i));
-            transform.prepare(flinkEnvironment);
-            DataStream<Row> stream = fromSourceTable(transformConfigs.get(i)).orElse(input);
-            input = transform.processStream(flinkEnvironment, stream);
-            Config pluginConfig = transformConfigs.get(i);
-            registerResultTable(pluginConfig, input);
-            transform.registerFunction(flinkEnvironment);
-        }
-
-        for (int i = 0; i < sinks.size(); i++) {
-            Config sinkConfig = sinkConfigs.get(i);
-            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            stream.sinkTo(sinks.get(i));
-        }
-        executionEnvironment.execute();
-    }
-
-    private List<Sink> initializeSinks(List<? extends Config> sinkConfigs) {
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
-        FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
-        return sinkConfigs.stream()
-            .map(sinkConfig -> {
-                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                    "seatunnel",
-                    "sink",
-                    sinkConfig.getString("plugin_name"));
-                pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                SeaTunnelSink<SeaTunnelRow, Object, Object, Object> pluginInstance = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
-                return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
-            }).collect(Collectors.toList());
-    }
-
-    private List<FlinkStreamTransform> initializeTransforms(List<? extends Config> transformConfigs) {
-        SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
-        return transformConfigs.stream()
-            .map(transformConfig -> {
-                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                    "seatunnel",
-                    "transform",
-                    transformConfig.getString("plugin_name"));
-                pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                return (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
-            }).collect(Collectors.toList());
-    }
-
-    private List<SeaTunnelParallelSource> initializeSources(List<? extends Config> sourceConfigs) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
-        List<SeaTunnelParallelSource> seaTunnelSources = new ArrayList<>();
-        for (int i = 0; i < sourceConfigs.size(); i++) {
-            Config sourceConfig = sourceConfigs.get(i);
-            PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                "seatunnel",
-                "source",
-                sourceConfig.getString("plugin_name"));
-            pluginJars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            seaTunnelSources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
-        }
-        return seaTunnelSources;
-    }
-
-    private void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
-        if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
-            String name = pluginConfig.getString(RESULT_TABLE_NAME);
-            StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
-            if (!TableUtil.tableExists(tableEnvironment, name)) {
-                if (pluginConfig.hasPath("field_name")) {
-                    String fieldName = pluginConfig.getString("field_name");
-                    tableEnvironment.registerDataStream(name, dataStream, fieldName);
-                } else {
-                    tableEnvironment.registerDataStream(name, dataStream);
-                }
-            }
-        }
-    }
-
-    private Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
-        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
-            StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
-            Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
-            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
-        }
-        return Optional.empty();
-    }
+        List<DataStream<Row>> dataStreams = new ArrayList<>();
+        dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
+        dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
+        sinkPluginExecuteProcessor.execute(dataStreams);
 
-    public String getExecutionPlan() {
-        return flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan();
+        LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+        flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
new file mode 100644
index 00000000..85726023
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Serializable;
+
+public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Row, Serializable, Serializable, Serializable>> {
+
+    protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                   List<? extends Config> pluginConfigs) {
+        super(flinkEnvironment, pluginConfigs);
+    }
+
+    @Override
+    protected List<Sink<Row, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+        List<URL> pluginJars = new ArrayList<>();
+        FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
+        List<Sink<Row, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
+            PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+                "seatunnel",
+                "sink",
+                sinkConfig.getString("plugin_name"));
+            pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> pluginInstance =
+                sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+            return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
+        }).collect(Collectors.toList());
+        flinkEnvironment.registerPlugin(pluginJars);
+        return sinks;
+    }
+
+    @Override
+    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception {
+        DataStream<Row> input = upstreamDataStreams.get(0);
+        for (int i = 0; i < plugins.size(); i++) {
+            Config sinkConfig = pluginConfigs.get(i);
+            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
+            stream.sinkTo(plugins.get(i));
+        }
+        // the sink is the last stream
+        return null;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
new file mode 100644
index 00000000..2e264af4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelParallelSource> {
+
+    public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                  List<? extends Config> sourceConfigs) {
+        super(flinkEnvironment, sourceConfigs);
+    }
+
+    @Override
+    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
+        StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+        List<DataStream<Row>> sources = new ArrayList<>();
+        for (int i = 0; i < plugins.size(); i++) {
+            DataStreamSource<Row> sourceStream = executionEnvironment.addSource(plugins.get(i));
+            Config pluginConfig = pluginConfigs.get(i);
+            registerResultTable(pluginConfig, sourceStream);
+            sources.add(sourceStream);
+        }
+        return sources;
+    }
+
+    @Override
+    protected List<SeaTunnelParallelSource> initializePlugins(List<? extends Config> pluginConfigs) {
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+        List<SeaTunnelParallelSource> sources = new ArrayList<>();
+        List<URL> jars = new ArrayList<>();
+        for (Config sourceConfig : pluginConfigs) {
+            PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+                "seatunnel",
+                "source",
+                sourceConfig.getString("plugin_name"));
+            jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+            sources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+        }
+        flinkEnvironment.registerPlugin(jars);
+        return sources;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
new file mode 100644
index 00000000..508dec9a
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+    protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
+                                        List<? extends Config> pluginConfigs) {
+        super(flinkEnvironment, pluginConfigs);
+    }
+
+    @Override
+    protected List<FlinkStreamTransform> initializePlugins(List<? extends Config> pluginConfigs) {
+        SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
+        List<URL> pluginJars = new ArrayList<>();
+        List<FlinkStreamTransform> transforms = pluginConfigs.stream()
+            .map(transformConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+                    "seatunnel",
+                    "transform",
+                    transformConfig.getString("plugin_name"));
+                pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+                FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(transformConfig);
+                pluginInstance.prepare(flinkEnvironment);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+        flinkEnvironment.registerPlugin(pluginJars);
+        return transforms;
+    }
+
+    @Override
+    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception {
+        if (plugins.isEmpty()) {
+            return upstreamDataStreams;
+        }
+        DataStream<Row> input = upstreamDataStreams.get(0);
+        List<DataStream<Row>> result = new ArrayList<>();
+        for (int i = 0; i < plugins.size(); i++) {
+            FlinkStreamTransform transform = plugins.get(i);
+            Config pluginConfig = pluginConfigs.get(i);
+            DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
+            input = transform.processStream(flinkEnvironment, stream);
+            registerResultTable(pluginConfig, input);
+            transform.registerFunction(flinkEnvironment);
+            result.add(input);
+        }
+        return result;
+    }
+}