You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/17 15:02:07 UTC
[incubator-seatunnel] branch api-draft updated: Add Transform to new SeaTunnel API (#1900)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 62b304ad Add Transform to new SeaTunnel API (#1900)
62b304ad is described below
commit 62b304ad9e3ada7d6785e36ca6d9ba581006365d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue May 17 23:02:02 2022 +0800
Add Transform to new SeaTunnel API (#1900)
---
.../command/SeaTunnelApiTaskExecuteCommand.java | 109 +-----------
.../flink/execution/SeaTunnelTaskExecution.java | 190 +++++++++++++++++++++
.../example/flink/SeaTunnelApiExample.java | 2 +-
.../examples/seatunnel_fake_to_console.conf | 53 ++++++
4 files changed, 249 insertions(+), 105 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index 8673ba02..f6cc4f8a 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,40 +17,19 @@
package org.apache.seatunnel.core.flink.command;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+import org.apache.seatunnel.core.flink.execution.SeaTunnelTaskExecution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URL;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
/**
* todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
@@ -68,94 +47,16 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
@Override
public void execute() throws CommandExecuteException {
- EngineType engine = flinkCommandArgs.getEngineType();
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
- FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
-
- SeaTunnelParallelSource source = getSource(config);
- // todo: add basic type
- Sink<Row, Object, Object, Object> flinkSink = getSink(config);
-
- registerPlugins(flinkEnvironment);
-
- StreamExecutionEnvironment streamExecutionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
- // support multiple sources/sink
- DataStreamSource<Row> dataStream = streamExecutionEnvironment.addSource(source);
- registerTable(dataStream, "fake_table", "name, age", flinkEnvironment);
- // todo: add transform
- DataStream<Row> transformOutputStream = TableUtil.tableToDataStream(
- flinkEnvironment.getStreamTableEnvironment(),
- flinkEnvironment.getStreamTableEnvironment().sqlQuery("select * from fake_table"),
- false);
- // add sink
- transformOutputStream.sinkTo(flinkSink);
+ SeaTunnelTaskExecution seaTunnelTaskExecution = new SeaTunnelTaskExecution(config);
try {
- streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
+ seaTunnelTaskExecution.execute();
+ LOGGER.info("Flink Execution Plan:{}", seaTunnelTaskExecution.getExecutionPlan());
} catch (Exception e) {
- throw new CommandExecuteException("SeaTunnelAPITaskExecuteCommand execute failed", e);
- }
- }
-
- private void registerTable(DataStream<Row> dataStream, String tableName, String fields, FlinkEnvironment flinkEnvironment) {
- StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
- if (!TableUtil.tableExists(streamTableEnvironment, tableName)) {
- streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
+ throw new CommandExecuteException("Flink job executed failed", e);
}
}
- private DataStream<Row> fromSourceTable(FlinkEnvironment flinkEnvironment, String tableName) {
- StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
- Table fakeTable = streamTableEnvironment.scan(tableName);
- return TableUtil.tableToDataStream(streamTableEnvironment, fakeTable, true);
- }
-
- private SeaTunnelParallelSource getSource(Config config) {
- PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
- // todo: use FactoryUtils to load the plugin
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
- return new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier));
- }
-
- private Sink<Row, Object, Object, Object> getSink(Config config) {
- PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
- FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
- return flinkSinkConverter.convert(sinkPluginDiscovery.getPluginInstance(pluginIdentifier), Collections.emptyMap());
- }
-
- private List<URL> getSourPluginJars(PluginIdentifier pluginIdentifier) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
- return sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
- }
-
- private List<URL> getSinkPluginJars(PluginIdentifier pluginIdentifier) {
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
- return sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
- }
-
- private PluginIdentifier getSourcePluginIdentifier() {
- return PluginIdentifier.of("seatunnel", "source", "FakeSource");
- }
-
- private PluginIdentifier getSinkPluginIdentifier() {
- return PluginIdentifier.of("seatunnel", "sink", "Console");
- }
-
- private void registerPlugins(FlinkEnvironment flinkEnvironment) {
- List<URL> pluginJars = new ArrayList<>();
- pluginJars.addAll(getSourPluginJars(getSourcePluginIdentifier()));
- pluginJars.addAll(getSinkPluginJars(getSinkPluginIdentifier()));
- flinkEnvironment.registerPlugin(pluginJars);
- }
-
- private FlinkEnvironment getFlinkEnvironment(Config config) {
- FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
- flinkEnvironment.setJobMode(JobMode.STREAMING);
- flinkEnvironment.setConfig(config.getConfig("env"));
- flinkEnvironment.prepare();
-
- return flinkEnvironment;
- }
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
new file mode 100644
index 00000000..2277e8ac
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SeaTunnelTaskExecution.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.execution;
+
+import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
+import org.apache.seatunnel.flink.util.TableUtil;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Used to execute a SeaTunnelTask.
+ */
+public class SeaTunnelTaskExecution {
+
+ private final Config config;
+ private final FlinkEnvironment flinkEnvironment;
+
+ // todo: initialize in sub class
+ private final List<SeaTunnelParallelSource> sources;
+ private final List<? extends Config> sourceConfigs;
+ private final List<FlinkStreamTransform> transforms;
+ private final List<? extends Config> transformConfigs;
+ private final List<Sink> sinks;
+ private final List<? extends Config> sinkConfigs;
+ private final List<URL> pluginJars = new ArrayList<>();
+
+ public SeaTunnelTaskExecution(Config config) {
+ this.config = config;
+ // todo: create the environment
+ this.flinkEnvironment = (FlinkEnvironment) new EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
+ this.flinkEnvironment.setJobMode(JobMode.STREAMING);
+ this.flinkEnvironment.prepare();
+ this.sourceConfigs = config.getConfigList("source");
+ this.transformConfigs = config.getConfigList("transform");
+ this.sinkConfigs = config.getConfigList("sink");
+
+ this.sources = initializeSources(sourceConfigs);
+ this.transforms = initializeTransforms(transformConfigs);
+ this.sinks = initializeSinks(sinkConfigs);
+ flinkEnvironment.registerPlugin(pluginJars);
+ }
+
+ public void execute() throws Exception {
+ List<DataStream<Row>> data = new ArrayList<>();
+ StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+ for (int i = 0; i < sources.size(); i++) {
+ DataStreamSource<Row> sourceStream = executionEnvironment.addSource(sources.get(i));
+ data.add(sourceStream);
+ Config pluginConfig = sourceConfigs.get(i);
+ registerResultTable(pluginConfig, sourceStream);
+ }
+
+ DataStream<Row> input = data.get(0);
+
+ for (int i = 0; i < transforms.size(); i++) {
+ FlinkStreamTransform transform = transforms.get(i);
+ transform.setConfig(transformConfigs.get(i));
+ transform.prepare(flinkEnvironment);
+ DataStream<Row> stream = fromSourceTable(transformConfigs.get(i)).orElse(input);
+ input = transform.processStream(flinkEnvironment, stream);
+ Config pluginConfig = transformConfigs.get(i);
+ registerResultTable(pluginConfig, input);
+ transform.registerFunction(flinkEnvironment);
+ }
+
+ for (int i = 0; i < sinks.size(); i++) {
+ Config sinkConfig = sinkConfigs.get(i);
+ DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
+ stream.sinkTo(sinks.get(i));
+ }
+ executionEnvironment.execute();
+ }
+
+ private List<Sink> initializeSinks(List<? extends Config> sinkConfigs) {
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+ FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
+ return sinkConfigs.stream()
+ .map(sinkConfig -> {
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ "seatunnel",
+ "sink",
+ sinkConfig.getString("plugin_name"));
+ pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ SeaTunnelSink<SeaTunnelRow, Object, Object, Object> pluginInstance = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
+ }).collect(Collectors.toList());
+ }
+
+ private List<FlinkStreamTransform> initializeTransforms(List<? extends Config> transformConfigs) {
+ SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
+ return transformConfigs.stream()
+ .map(transformConfig -> {
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ "seatunnel",
+ "transform",
+ transformConfig.getString("plugin_name"));
+ pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ return (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ }).collect(Collectors.toList());
+ }
+
+ private List<SeaTunnelParallelSource> initializeSources(List<? extends Config> sourceConfigs) {
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+ List<SeaTunnelParallelSource> seaTunnelSources = new ArrayList<>();
+ for (int i = 0; i < sourceConfigs.size(); i++) {
+ Config sourceConfig = sourceConfigs.get(i);
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+ "seatunnel",
+ "source",
+ sourceConfig.getString("plugin_name"));
+ pluginJars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
+ seaTunnelSources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+ }
+ return seaTunnelSources;
+ }
+
+ private void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
+ if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
+ String name = pluginConfig.getString(RESULT_TABLE_NAME);
+ StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+ if (!TableUtil.tableExists(tableEnvironment, name)) {
+ if (pluginConfig.hasPath("field_name")) {
+ String fieldName = pluginConfig.getString("field_name");
+ tableEnvironment.registerDataStream(name, dataStream, fieldName);
+ } else {
+ tableEnvironment.registerDataStream(name, dataStream);
+ }
+ }
+ }
+ }
+
+ private Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
+ if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
+ StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment();
+ Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
+ return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
+ }
+ return Optional.empty();
+ }
+
+ public String getExecutionPlan() {
+ return flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan();
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
index 3f13e4b5..3e206754 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
@@ -32,7 +32,7 @@ import java.nio.file.Paths;
public class SeaTunnelApiExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
- String configFile = getTestConfigFile("/examples/fake_to_console.conf");
+ String configFile = getTestConfigFile("/examples/seatunnel_fake_to_console.conf");
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setConfigFile(configFile);
flinkCommandArgs.setCheckConfig(false);
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/seatunnel_fake_to_console.conf b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/seatunnel_fake_to_console.conf
new file mode 100644
index 00000000..4c0fb709
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/seatunnel_fake_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file