You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/17 15:02:07 UTC

[incubator-seatunnel] branch api-draft updated: Add Transform to new SeaTunnel API (#1900)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 62b304ad Add Transform to new SeaTunnel API (#1900)
62b304ad is described below

commit 62b304ad9e3ada7d6785e36ca6d9ba581006365d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue May 17 23:02:02 2022 +0800

    Add Transform to new SeaTunnel API (#1900)
---
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 109 +-----------
 .../flink/execution/SeaTunnelTaskExecution.java    | 190 +++++++++++++++++++++
 .../example/flink/SeaTunnelApiExample.java         |   2 +-
 .../examples/seatunnel_fake_to_console.conf        |  53 ++++++
 4 files changed, 249 insertions(+), 105 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index 8673ba02..f6cc4f8a 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,40 +17,19 @@
 
 package org.apache.seatunnel.core.flink.command;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+import org.apache.seatunnel.core.flink.execution.SeaTunnelTaskExecution;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URL;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
 /**
  * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
@@ -68,94 +47,16 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
 
     @Override
     public void execute() throws CommandExecuteException {
-        EngineType engine = flinkCommandArgs.getEngineType();
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
         Config config = new ConfigBuilder(configFile).getConfig();
-        FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
-
-        SeaTunnelParallelSource source = getSource(config);
-        // todo: add basic type
-        Sink<Row, Object, Object, Object> flinkSink = getSink(config);
-
-        registerPlugins(flinkEnvironment);
-
-        StreamExecutionEnvironment streamExecutionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
-        // support multiple sources/sink
-        DataStreamSource<Row> dataStream = streamExecutionEnvironment.addSource(source);
-        registerTable(dataStream, "fake_table", "name, age", flinkEnvironment);
-        // todo: add transform
-        DataStream<Row> transformOutputStream = TableUtil.tableToDataStream(
-            flinkEnvironment.getStreamTableEnvironment(),
-            flinkEnvironment.getStreamTableEnvironment().sqlQuery("select * from fake_table"),
-            false);
-        // add sink
-        transformOutputStream.sinkTo(flinkSink);
+        SeaTunnelTaskExecution seaTunnelTaskExecution = new SeaTunnelTaskExecution(config);
         try {
-            streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
+            seaTunnelTaskExecution.execute();
+            LOGGER.info("Flink Execution Plan:{}", seaTunnelTaskExecution.getExecutionPlan());
         } catch (Exception e) {
-            throw new CommandExecuteException("SeaTunnelAPITaskExecuteCommand execute failed", e);
-        }
-    }
-
-    private void registerTable(DataStream<Row> dataStream, String tableName, String fields, FlinkEnvironment flinkEnvironment) {
-        StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
-        if (!TableUtil.tableExists(streamTableEnvironment, tableName)) {
-            streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
+            throw new CommandExecuteException("Flink job executed failed", e);
         }
     }
 
-    private DataStream<Row> fromSourceTable(FlinkEnvironment flinkEnvironment, String tableName) {
-        StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
-        Table fakeTable = streamTableEnvironment.scan(tableName);
-        return TableUtil.tableToDataStream(streamTableEnvironment, fakeTable, true);
-    }
-
-    private SeaTunnelParallelSource getSource(Config config) {
-        PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
-        // todo: use FactoryUtils to load the plugin
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
-        return new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier));
-    }
-
-    private Sink<Row, Object, Object, Object> getSink(Config config) {
-        PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
-        FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
-        return flinkSinkConverter.convert(sinkPluginDiscovery.getPluginInstance(pluginIdentifier), Collections.emptyMap());
-    }
-
-    private List<URL> getSourPluginJars(PluginIdentifier pluginIdentifier) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
-        return sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
-    }
-
-    private List<URL> getSinkPluginJars(PluginIdentifier pluginIdentifier) {
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
-        return sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
-    }
-
-    private PluginIdentifier getSourcePluginIdentifier() {
-        return PluginIdentifier.of("seatunnel", "source", "FakeSource");
-    }
-
-    private PluginIdentifier getSinkPluginIdentifier() {
-        return PluginIdentifier.of("seatunnel", "sink", "Console");
-    }
-
-    private void registerPlugins(FlinkEnvironment flinkEnvironment) {
-        List<URL> pluginJars = new ArrayList<>();
-        pluginJars.addAll(getSourPluginJars(getSourcePluginIdentifier()));
-        pluginJars.addAll(getSinkPluginJars(getSinkPluginIdentifier()));
-        flinkEnvironment.registerPlugin(pluginJars);
-    }
-
-    private FlinkEnvironment getFlinkEnvironment(Config config) {
-        FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
-        flinkEnvironment.setJobMode(JobMode.STREAMING);
-        flinkEnvironment.setConfig(config.getConfig("env"));
-        flinkEnvironment.prepare();
-
-        return flinkEnvironment;
-    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
new file mode 100644
index 00000000..2277e8ac
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
+import org.apache.seatunnel.flink.util.TableUtil;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Used to execute a SeaTunnelTask.
+ */
+public class SeaTunnelTaskExecution {
+
+    private final Config config;
+    private final FlinkEnvironment flinkEnvironment;
+
+    // todo: initialize in sub class
+    private final List<SeaTunnelParallelSource> sources;
+    private final List<? extends Config> sourceConfigs;
+    private final List<FlinkStreamTransform> transforms;
+    private final List<? extends Config> transformConfigs;
+    private final List<Sink> sinks;
+    private final List<? extends Config> sinkConfigs;
+    private final List<URL> pluginJars = new ArrayList<>();
+
+    public SeaTunnelTaskExecution(Config config) {
+        this.config = config;
+        // todo: create the environment
+        this.flinkEnvironment = (FlinkEnvironment) new EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
+        this.flinkEnvironment.setJobMode(JobMode.STREAMING);
+        this.flinkEnvironment.prepare();
+        this.sourceConfigs = config.getConfigList("source");
+        this.transformConfigs = config.getConfigList("transform");
+        this.sinkConfigs = config.getConfigList("sink");
+
+        this.sources = initializeSources(sourceConfigs);
+        this.transforms = initializeTransforms(transformConfigs);
+        this.sinks = initializeSinks(sinkConfigs);
+        flinkEnvironment.registerPlugin(pluginJars);
+    }
+
+    public void execute() throws Exception {
+        List<DataStream<Row>> data = new ArrayList<>();
+        StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+        for (int i = 0; i < sources.size(); i++) {
+            DataStreamSource<Row> sourceStream = executionEnvironment.addSource(sources.get(i));
+            data.add(sourceStream);
+            Config pluginConfig = sourceConfigs.get(i);
+            registerResultTable(pluginConfig, sourceStream);
+        }
+
+        DataStream<Row> input = data.get(0);
+
+        for (int i = 0; i < transforms.size(); i++) {
+            FlinkStreamTransform transform = transforms.get(i);
+            transform.setConfig(transformConfigs.get(i));
+            transform.prepare(flinkEnvironment);
+            DataStream<Row> stream = fromSourceTable(transformConfigs.get(i)).orElse(input);
+            input = transform.processStream(flinkEnvironment, stream);
+            Config pluginConfig = transformConfigs.get(i);
+            registerResultTable(pluginConfig, input);
+            transform.registerFunction(flinkEnvironment);
+        }
+
+        for (int i = 0; i < sinks.size(); i++) {
+            Config sinkConfig = sinkConfigs.get(i);
+            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
+            stream.sinkTo(sinks.get(i));
+        }
+        executionEnvironment.execute();
+    }
+
+    private List<Sink> initializeSinks(List<? extends Config> sinkConfigs) {
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+        FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
+        return sinkConfigs.stream()
+            .map(sinkConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+                    "seatunnel",
+                    "sink",
+                    sinkConfig.getString("plugin_name"));
+                pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+                SeaTunnelSink<SeaTunnelRow, Object, Object, Object> pluginInstance = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
+            }).collect(Collectors.toList());
+    }
+
+    private List<FlinkStreamTransform> initializeTransforms(List<? extends Config> transformConfigs) {
+        SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
+        return transformConfigs.stream()
+            .map(transformConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+                    "seatunnel",
+                    "transform",
+                    transformConfig.getString("plugin_name"));
+                pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+                return (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+            }).collect(Collectors.toList());
+    }
+
+    private List<SeaTunnelParallelSource> initializeSources(List<? extends Config> sourceConfigs) {
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+        List<SeaTunnelParallelSource> seaTunnelSources = new ArrayList<>();
+        for (int i = 0; i < sourceConfigs.size(); i++) {
+            Config sourceConfig = sourceConfigs.get(i);
+            PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+                "seatunnel",
+                "source",
+                sourceConfig.getString("plugin_name"));
+            pluginJars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+            seaTunnelSources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+        }
+        return seaTunnelSources;
+    }
+
+    private void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
+        if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
+            String name = pluginConfig.getString(RESULT_TABLE_NAME);
+            StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+            if (!TableUtil.tableExists(tableEnvironment, name)) {
+                if (pluginConfig.hasPath("field_name")) {
+                    String fieldName = pluginConfig.getString("field_name");
+                    tableEnvironment.registerDataStream(name, dataStream, fieldName);
+                } else {
+                    tableEnvironment.registerDataStream(name, dataStream);
+                }
+            }
+        }
+    }
+
+    private Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+            StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+            Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+        }
+        return Optional.empty();
+    }
+
+    public String getExecutionPlan() {
+        return flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan();
+    }
+}
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
index 3f13e4b5..3e206754 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
@@ -32,7 +32,7 @@ import java.nio.file.Paths;
 public class SeaTunnelApiExample {
 
     public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
-        String configFile = getTestConfigFile("/examples/fake_to_console.conf");
+        String configFile = getTestConfigFile("/examples/seatunnel_fake_to_console.conf");
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         flinkCommandArgs.setConfigFile(configFile);
         flinkCommandArgs.setCheckConfig(false);
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/seatunnel_fake_to_console.conf b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/seatunnel_fake_to_console.conf
new file mode 100644
index 00000000..4c0fb709
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/seatunnel_fake_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  Console {}
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file