You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/13 07:52:14 UTC
[incubator-seatunnel] branch api-draft updated: Add SeaTunnelAPIExample (#1867)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 96bd7031 Add SeaTunnelAPIExample (#1867)
96bd7031 is described below
commit 96bd7031fb59c65dabe31c4a3fe803f43512186c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri May 13 15:52:09 2022 +0800
Add SeaTunnelAPIExample (#1867)
* Add command implementation
* Add SeaTunnelAPIExample
---
.../seatunnel/console/sink/ConsoleSink.java | 29 -----
.../org.apache.seatunnel.api.sink.SeaTunnelSink | 17 +++
.../seatunnel/fake/source/FakeSource.java | 5 +-
.../seatunnel/fake/source/FakeSourceReader.java | 22 +++-
...org.apache.seatunnel.api.source.SeaTunnelSource | 17 +++
.../org/apache/seatunnel/core/base/Seatunnel.java | 3 +-
.../core/base/command/AbstractCommandArgs.java | 1 +
.../seatunnel/core/base/command/Command.java | 3 +-
.../{command/Command.java => config/ApiType.java} | 26 ++---
.../seatunnel/core/base/config/ConfigBuilder.java | 19 +---
.../Command.java => config/ConfigChecker.java} | 20 ++--
.../core/base/config/EnvironmentFactory.java | 1 +
.../CommandException.java} | 23 ++--
.../CommandExecuteException.java} | 23 ++--
.../ConfigCheckException.java} | 21 ++--
seatunnel-core/seatunnel-core-flink/pom.xml | 26 +++++
.../seatunnel/core/flink/SeatunnelFlink.java | 3 +-
.../core/flink/args/FlinkCommandArgs.java | 36 ++++++
...mmand.java => FlinkApiConfValidateCommand.java} | 14 ++-
...ommand.java => FlinkApiTaskExecuteCommand.java} | 13 ++-
.../core/flink/command/FlinkCommandBuilder.java | 32 +++++-
...d.java => SeaTunnelApiConfValidateCommand.java} | 27 ++---
.../command/SeaTunnelApiTaskExecuteCommand.java | 121 +++++++++++++++++++++
.../core/flink/config/FlinkApiConfigChecker.java | 46 ++++++++
.../flink/config/SeaTunnelApiConfigChecker.java} | 22 ++--
.../core/flink/config/SeaTunnelApiEnvironment.java | 48 ++++----
.../seatunnel/core/spark/SeatunnelSpark.java | 3 +-
.../apache/seatunnel/core/spark/SparkStarter.java | 2 +-
.../spark/command/SparkConfValidateCommand.java | 8 +-
.../spark/command/SparkTaskExecuteCommand.java | 7 +-
.../spark/config/SeaTunnelApiConfigChecker.java} | 22 ++--
.../core/spark/config/SeaTunnelEnvironment.java | 66 +++++++++++
.../core/spark/config/SparkApiConfigChecker.java | 46 ++++++++
.../seatunnel-flink-examples/pom.xml | 10 ++
.../seatunnel/example/flink/LocalFlinkExample.java | 3 +-
...lFlinkExample.java => SeaTunnelApiExample.java} | 7 +-
.../seatunnel/example/spark/LocalSparkExample.java | 3 +-
.../translation/flink/sink/FlinkSinkWriter.java | 3 +
38 files changed, 589 insertions(+), 209 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 947b3125..d7eab44b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,16 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
import java.util.List;
-import java.util.Optional;
public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
@@ -40,29 +36,4 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
SinkWriter.Context context, List<ConsoleState> states) {
return restoreWriter(context, states);
}
-
- @Override
- public Optional<Serializer<ConsoleState>> getWriterStateSerializer() {
- return getWriterStateSerializer();
- }
-
- @Override
- public Optional<SinkCommitter<ConsoleCommitInfo>> createCommitter() {
- return createCommitter();
- }
-
- @Override
- public Optional<Serializer<ConsoleCommitInfo>> getCommitInfoSerializer() {
- return getCommitInfoSerializer();
- }
-
- @Override
- public Optional<SinkAggregatedCommitter<ConsoleCommitInfo, ConsoleAggregatedCommitInfo>> createAggregatedCommitter() {
- return createAggregatedCommitter();
- }
-
- @Override
- public Optional<Serializer<ConsoleAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
- return getAggregatedCommitInfoSerializer();
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
new file mode 100644
index 00000000..12b49983
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0c3fb31b..9f18668b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -22,9 +22,10 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
-public class FakeSource implements SeaTunnelSource<FakeSourceEvent, FakeSourceSplit, FakeState> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
@Override
public Boundedness getBoundedness() {
@@ -32,7 +33,7 @@ public class FakeSource implements SeaTunnelSource<FakeSourceEvent, FakeSourceSp
}
@Override
- public SourceReader<FakeSourceEvent, FakeSourceSplit> createReader(SourceReader.Context readerContext) {
+ public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) {
return new FakeSourceReader(readerContext);
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index fa1c28c4..bf2cb2f6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,18 +19,27 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
-public class FakeSourceReader implements SourceReader<FakeSourceEvent, FakeSourceSplit> {
+public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
private final SourceReader.Context context;
+ private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
+ private final int[] ages = {1024, 2048, 4096, 8192};
+ private final Random random = ThreadLocalRandom.current();
+
public FakeSourceReader(SourceReader.Context context) {
this.context = context;
}
@@ -47,8 +56,15 @@ public class FakeSourceReader implements SourceReader<FakeSourceEvent, FakeSourc
@Override
@SuppressWarnings("magicnumber")
- public void pollNext(Collector<FakeSourceEvent> output) {
- output.collect(new FakeSourceEvent("Tom", 19, System.currentTimeMillis()));
+ public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
+ Thread.sleep(1000L);
+ int i = random.nextInt(names.length);
+ Map<String, Object> fieldMap = new HashMap<>(4);
+ fieldMap.put("name", names[i]);
+ fieldMap.put("age", ages[i]);
+ fieldMap.put("timestamp", System.currentTimeMillis());
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i], ages[i], System.currentTimeMillis()}, fieldMap);
+ output.collect(seaTunnelRow);
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
new file mode 100644
index 00000000..b21a0581
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
index 644df716..e4587284 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.base;
import org.apache.seatunnel.apis.base.command.CommandArgs;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -34,7 +35,7 @@ public class Seatunnel {
* @param command commandArgs
* @param <T> commandType
*/
- public static <T extends CommandArgs> void run(Command<T> command) {
+ public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
try {
command.execute();
} catch (ConfigRuntimeException e) {
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 7f46fb03..a19fe285 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -37,6 +37,7 @@ public abstract class AbstractCommandArgs implements CommandArgs {
description = "variable substitution, such as -i city=beijing, or -i date=20190318")
private List<String> variables = Collections.emptyList();
+ // todo: use command type enum
@Parameter(names = {"-t", "--check"},
description = "check config")
private boolean checkConfig = false;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
index 5cd80fc5..cca16c30 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.core.base.command;
import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.exception.CommandException;
/**
* Command interface.
@@ -30,6 +31,6 @@ public interface Command<T extends CommandArgs> {
/**
* Execute command
*/
- void execute();
+ void execute() throws CommandException;
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
similarity index 73%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
index 5cd80fc5..5468eee3 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+public enum ApiType {
+ ENGINE_API("engine"),
+ SEATUNNEL_API("seatunnel"),
+ ;
+ private final String apiType;
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
- /**
- * Execute command
- */
- void execute();
+ ApiType(String apiType) {
+ this.apiType = apiType;
+ }
+ public String getApiType() {
+ return apiType;
+ }
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 605613b0..758ec8c8 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.core.base.config;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,18 +34,16 @@ import java.nio.file.Path;
*
* @param <ENVIRONMENT> environment type.
*/
-public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
+public class ConfigBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Path configFile;
- private final EngineType engine;
private final Config config;
- public ConfigBuilder(Path configFile, EngineType engine) {
+ public ConfigBuilder(Path configFile) {
this.configFile = configFile;
- this.engine = engine;
this.config = load();
}
@@ -75,16 +72,4 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
return config;
}
- /**
- * check if config is valid.
- **/
- public void checkConfig() {
- // check environment
- ENVIRONMENT environment = new EnvironmentFactory<ENVIRONMENT>(config, engine).getEnvironment();
- // check plugins
- PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config, engine);
- pluginFactory.createPlugins(PluginType.SOURCE);
- pluginFactory.createPlugins(PluginType.TRANSFORM);
- pluginFactory.createPlugins(PluginType.SINK);
- }
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
similarity index 60%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
index 5cd80fc5..30e4243e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
@@ -15,21 +15,25 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
/**
- * Command interface.
+ * Check the config is valid.
*
- * @param <T> args type
+ * @param <ENVIRONMENT> the environment type.
*/
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+public interface ConfigChecker<ENVIRONMENT extends RuntimeEnv> {
/**
- * Execute command
+ * Check if the config is validated, if check fails, throw exception.
+ *
+ * @param config given config.
*/
- void execute();
+ void checkConfig(Config config) throws ConfigCheckException;
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
index ff450836..9daa23b7 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
@@ -43,6 +43,7 @@ public class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
this.engine = engine;
}
+ // todo:put this method into submodule to avoid dependency on the engine
public synchronized ENVIRONMENT getEnvironment() {
Config envConfig = config.getConfig("env");
boolean enableHive = checkIsContainHive();
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
similarity index 73%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
index 5cd80fc5..33f2013b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
@@ -15,21 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
-
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
- /**
- * Execute command
- */
- void execute();
+public class CommandException extends Exception {
+ public CommandException(String message) {
+ super(message);
+ }
+ public CommandException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
similarity index 72%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
index 5cd80fc5..a3a8ee42 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
@@ -15,21 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
-
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
- /**
- * Execute command
- */
- void execute();
+public class CommandExecuteException extends CommandException {
+ public CommandExecuteException(String message) {
+ super(message);
+ }
+ public CommandExecuteException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
similarity index 73%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
index 5cd80fc5..22d193bf 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
@@ -15,21 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+public class ConfigCheckException extends CommandException {
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+ public ConfigCheckException(String message) {
+ super(message);
+ }
- /**
- * Execute command
- */
- void execute();
+ public ConfigCheckException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml b/seatunnel-core/seatunnel-core-flink/pom.xml
index d985628a..d5101a41 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -42,6 +42,32 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-flink</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- todo: use another module to execute new API? -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-flink</artifactId>
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index 45792d00..f1db4a15 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
@@ -26,7 +27,7 @@ import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
public class SeatunnelFlink {
- public static void main(String[] args) {
+ public static void main(String[] args) throws CommandException {
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index fb892f9c..5b27fcfd 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.flink.args;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.base.config.ApiType;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.flink.config.FlinkRunMode;
@@ -34,6 +35,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
description = "job run mode, run or run-application")
private FlinkRunMode runMode = FlinkRunMode.RUN;
+ @Parameter(names = {"-api", "--api-type"},
+ converter = ApiTypeConverter.class,
+ description = "Api type, engine or seatunnel")
+ private ApiType apiType = ApiType.ENGINE_API;
+
/**
* Undefined parameters parsed will be stored here as flink command parameters.
*/
@@ -65,6 +71,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
this.flinkParams = flinkParams;
}
+ public ApiType getApiType() {
+ return apiType;
+ }
+
+ public void setApiType(ApiType apiType) {
+ this.apiType = apiType;
+ }
+
/**
* Used to convert the run mode string to the enum value.
*/
@@ -86,4 +100,26 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
}
}
+ /**
+ * Used to convert the api type string to the enum value.
+ */
+ private static class ApiTypeConverter implements IStringConverter<ApiType> {
+
+ /**
+ * If the '-api' is not set, then will not go into this convert method.
+ *
+ * @param value input value set by '-api' or '--api-type'
+ * @return api type enum value
+ */
+ @Override
+ public ApiType convert(String value) {
+ for (ApiType apiType : ApiType.values()) {
+ if (apiType.getApiType().equalsIgnoreCase(value)) {
+ return apiType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("API type %s not supported", value));
+ }
+ }
+
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
similarity index 74%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
index 5868e252..f03522a0 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.core.flink.command;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.core.flink.config.FlinkApiConfigChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,20 +32,21 @@ import java.nio.file.Path;
/**
* Used to check the Flink conf is validated.
*/
-public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
+public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
private final FlinkCommandArgs flinkCommandArgs;
- public FlinkConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
+ public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
- public void execute() {
+ public void execute() throws ConfigCheckException {
Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
- new ConfigBuilder<FlinkEnvironment>(configPath, flinkCommandArgs.getEngineType()).checkConfig();
+ ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+ new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
LOGGER.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
similarity index 84%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index 32d13440..345f8414 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -36,22 +37,22 @@ import java.nio.file.Path;
import java.util.List;
/**
- * Used to execute Flink Job.
+ * Used to execute Flink Job by Flink API.
*/
-public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+public class FlinkApiTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
private final FlinkCommandArgs flinkCommandArgs;
- public FlinkTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
- public void execute() {
+ public void execute() throws CommandExecuteException {
EngineType engine = flinkCommandArgs.getEngineType();
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
- Config config = new ConfigBuilder<>(configFile, engine).getConfig();
+ Config config = new ConfigBuilder(configFile).getConfig();
ExecutionContext<FlinkEnvironment> executionContext = new ExecutionContext<>(config, engine);
List<BaseSource<FlinkEnvironment>> sources = executionContext.getSources();
List<BaseTransform<FlinkEnvironment>> transforms = executionContext.getTransforms();
@@ -68,7 +69,7 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
execution.start(sources, transforms, sinks);
close(sources, transforms, sinks);
} catch (Exception e) {
- throw new RuntimeException("Execute Flink task error", e);
+ throw new CommandExecuteException("Execute Flink task error", e);
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 595f01a5..590e7d09 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -30,7 +30,35 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
throw new IllegalArgumentException(
String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
}
- return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand(commandArgs)
- : new FlinkTaskExecuteCommand(commandArgs);
+ switch (commandArgs.getApiType()) {
+ case ENGINE_API:
+ return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+ case SEATUNNEL_API:
+ return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+ default:
+ throw new IllegalArgumentException("Unsupported API type: " + commandArgs.getApiType());
+ }
+ }
+
+ /**
+ * Used to generate command for engine API.
+ */
+ private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
+ @Override
+ public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
+ return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
+ : new FlinkApiTaskExecuteCommand(commandArgs);
+ }
+ }
+
+ /**
+ * Used to generate command for seaTunnel API.
+ */
+ private static class SeaTunnelApiCommandBuilder extends FlinkCommandBuilder {
+ @Override
+ public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
+ return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
+ : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+ }
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
similarity index 64%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
index c58884f5..e9c93411 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
@@ -19,11 +19,10 @@ package org.apache.seatunnel.core.flink.command;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.core.flink.config.SeaTunnelApiConfigChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,26 +30,24 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
/**
- * Used to execute a SeaTunnelTask. This command is used to execute the Flink job by new API.
+ * Use to validate the configuration of the SeaTunnel API.
*/
-public class SeaTunnelTaskExecuteCommand implements Command<FlinkCommandArgs> {
+public class SeaTunnelApiConfValidateCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
private final FlinkCommandArgs flinkCommandArgs;
- public SeaTunnelTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ public SeaTunnelApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
- public void execute() {
- EngineType engine = flinkCommandArgs.getEngineType();
- Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
-
- Config config = new ConfigBuilder<>(configFile, engine).getConfig();
- // initialize the new plugin.
- // translate plugin to flink source/sink
- // execute the flink job
+ public void execute() throws ConfigCheckException {
+ Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
+ // todo: validate the config by new api
+ ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+ new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+ LOGGER.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 00000000..e767df5f
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * This command is used to execute the Flink job by SeaTunnel new API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
+
+ private final FlinkCommandArgs flinkCommandArgs;
+
+ public SeaTunnelApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ this.flinkCommandArgs = flinkCommandArgs;
+ }
+
+ @Override
+ public void execute() throws CommandExecuteException {
+ EngineType engine = flinkCommandArgs.getEngineType();
+ Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
+
+ Config config = new ConfigBuilder(configFile).getConfig();
+ // todo: add basic type
+ SeaTunnelParallelSource source = getSource();
+ Sink<WrappedRow, Object, Object, Object> flinkSink = getSink();
+ // execute the flink job
+ FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+ StreamExecutionEnvironment streamExecutionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+ DataStreamSource<WrappedRow> dataStream = streamExecutionEnvironment.addSource(source);
+ dataStream.sinkTo(flinkSink);
+ try {
+ streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
+ } catch (Exception e) {
+ throw new CommandExecuteException("SeaTunnelAPITaskExecuteCommand execute failed", e);
+ }
+ }
+
+ private SeaTunnelParallelSource getSource() {
+ return new SeaTunnelParallelSource(loadSourcePlugin());
+ }
+
+ private Sink<WrappedRow, Object, Object, Object> getSink() {
+ SeaTunnelSink<SeaTunnelRow, Object, Object, Object> sink = loadSinkPlugin();
+ FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
+ return flinkSinkConverter.convert(sink, Collections.emptyMap());
+ }
+
+ private <T, SplitT extends SourceSplit, StateT> SeaTunnelSource<T, SplitT, StateT> loadSourcePlugin() {
+ // todo: use FactoryUtils to load the plugin
+ ServiceLoader<SeaTunnelSource> serviceLoader = ServiceLoader.load(SeaTunnelSource.class);
+ Iterator<SeaTunnelSource> iterator = serviceLoader.iterator();
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ throw new IllegalArgumentException("Cannot find the plugin.");
+ }
+
+ private <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> loadSinkPlugin() {
+ // todo: use FactoryUtils to load the plugin
+ ServiceLoader<SeaTunnelSink> serviceLoader = ServiceLoader.load(SeaTunnelSink.class);
+ Iterator<SeaTunnelSink> iterator = serviceLoader.iterator();
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ throw new IllegalArgumentException("Cannot find the plugin.");
+ }
+
+ private FlinkEnvironment getFlinkEnvironment(Config config) {
+ FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
+ flinkEnvironment.setJobMode(JobMode.STREAMING);
+ flinkEnvironment.setConfig(config);
+ flinkEnvironment.prepare();
+
+ return flinkEnvironment;
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
new file mode 100644
index 00000000..7dca74ec
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.config;
+
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+public class FlinkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
+
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ try {
+ // check environment
+ FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+ // check plugins
+ PluginFactory<FlinkEnvironment> pluginFactory = new PluginFactory<>(config, EngineType.FLINK);
+ pluginFactory.createPlugins(PluginType.SOURCE);
+ pluginFactory.createPlugins(PluginType.TRANSFORM);
+ pluginFactory.createPlugins(PluginType.SINK);
+ } catch (Exception ex) {
+ throw new ConfigCheckException("Config check fail", ex);
+ }
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
similarity index 64%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
index 5cd80fc5..7f0a722f 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.flink.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
- /**
- * Execute command
- */
- void execute();
+public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelApiEnvironment> {
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ // todo: implement
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
similarity index 55%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
index fa1c28c4..7d0d5956 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
@@ -15,59 +15,61 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.core.flink.config;
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URL;
import java.util.List;
-public class FakeSourceReader implements SourceReader<FakeSourceEvent, FakeSourceSplit> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
+public class SeaTunnelApiEnvironment implements RuntimeEnv {
- private final SourceReader.Context context;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiEnvironment.class);
- public FakeSourceReader(SourceReader.Context context) {
- this.context = context;
- }
+ private Config config;
@Override
- public void open() {
-
+ public SeaTunnelApiEnvironment setConfig(Config config) {
+ this.config = config;
+ return this;
}
@Override
- public void close() {
-
+ public Config getConfig() {
+ return config;
}
@Override
- @SuppressWarnings("magicnumber")
- public void pollNext(Collector<FakeSourceEvent> output) {
- output.collect(new FakeSourceEvent("Tom", 19, System.currentTimeMillis()));
+ public CheckResult checkConfig() {
+ // todo
+ return null;
}
@Override
- public List<FakeSourceSplit> snapshotState(long checkpointId) {
+ public SeaTunnelApiEnvironment prepare() {
+ // todo
return null;
}
@Override
- public void addSplits(List<FakeSourceSplit> splits) {
-
+ public SeaTunnelApiEnvironment setJobMode(JobMode mode) {
+ return null;
}
@Override
- public void handleNoMoreSplits() {
-
+ public JobMode getJobMode() {
+ return null;
}
@Override
- public void notifyCheckpointComplete(long checkpointId) {
+ public void registerPlugin(List<URL> pluginPaths) {
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
index d88cc7ec..05a1a1d1 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
@@ -19,13 +19,14 @@ package org.apache.seatunnel.core.spark;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
import org.apache.seatunnel.core.spark.utils.CommandLineUtils;
public class SeatunnelSpark {
- public static void main(String[] args) {
+ public static void main(String[] args) throws CommandException {
SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
Command<SparkCommandArgs> sparkCommand =
new SparkCommandBuilder().buildCommand(sparkArgs);
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 81f44ba9..41b53080 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -220,7 +220,7 @@ public class SparkStarter implements Starter {
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
return Collections.emptyList();
}
- Config config = new ConfigBuilder<>(Paths.get(commandArgs.getConfigFile()), EngineType.SPARK).getConfig();
+ Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config, EngineType.SPARK);
return pluginFactory.getPluginJarPaths().stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
index 5629e51d..0f4a9307 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.core.spark.command;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.core.spark.config.SparkApiConfigChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +43,10 @@ public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
}
@Override
- public void execute() {
+ public void execute() throws ConfigCheckException {
Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
- new ConfigBuilder<SparkEnvironment>(confPath, sparkCommandArgs.getEngineType()).checkConfig();
+ ConfigBuilder configBuilder = new ConfigBuilder(confPath);
+ new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
LOGGER.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index c8aebd4b..9a051416 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.spark.SparkEnvironment;
@@ -44,11 +45,11 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
}
@Override
- public void execute() {
+ public void execute() throws CommandExecuteException {
EngineType engine = sparkCommandArgs.getEngineType();
Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
- Config config = new ConfigBuilder<>(confFile, engine).getConfig();
+ Config config = new ConfigBuilder(confFile).getConfig();
ExecutionContext<SparkEnvironment> executionContext = new ExecutionContext<>(config, engine);
List<BaseSource<SparkEnvironment>> sources = executionContext.getSources();
@@ -66,7 +67,7 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
execution.start(sources, transforms, sinks);
close(sources, transforms, sinks);
} catch (Exception e) {
- throw new RuntimeException("Execute Spark task error", e);
+ throw new CommandExecuteException("Execute Spark task error", e);
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
similarity index 64%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
index 5cd80fc5..3715c14b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.spark.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
- /**
- * Execute command
- */
- void execute();
+public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelEnvironment> {
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ // todo: implement
+ }
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
new file mode 100644
index 00000000..6f9e0135
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
@@ -0,0 +1,66 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.List;
+
+public class SeaTunnelEnvironment implements RuntimeEnv {
+
+ @Override
+ public SeaTunnelEnvironment setConfig(Config config) {
+ return null;
+ }
+
+ @Override
+ public Config getConfig() {
+ return null;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelEnvironment prepare() {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelEnvironment setJobMode(JobMode mode) {
+ return null;
+ }
+
+ @Override
+ public JobMode getJobMode() {
+ return null;
+ }
+
+ @Override
+ public void registerPlugin(List<URL> pluginPaths) {
+
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
new file mode 100644
index 00000000..26de5794
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+public class SparkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
+
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ try {
+ // check environment
+ FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+ // check plugins
+ PluginFactory<FlinkEnvironment> pluginFactory = new PluginFactory<>(config, EngineType.FLINK);
+ pluginFactory.createPlugins(PluginType.SOURCE);
+ pluginFactory.createPlugins(PluginType.TRANSFORM);
+ pluginFactory.createPlugins(PluginType.SINK);
+ } catch (Exception ex) {
+ throw new ConfigCheckException("Config check fail", ex);
+ }
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index 79fbe8f1..bd5caf5e 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -56,6 +56,16 @@
<artifactId>seatunnel-connector-flink-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index a740b8ee..b41236e5 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.example.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
@@ -29,7 +30,7 @@ import java.nio.file.Paths;
public class LocalFlinkExample {
- public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
String configFile = getTestConfigFile("/examples/fake_to_console.conf");
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setConfigFile(configFile);
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
similarity index 88%
copy from seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
copy to seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
index a740b8ee..3f13e4b5 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.example.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ApiType;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
@@ -27,14 +29,15 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
-public class LocalFlinkExample {
+public class SeaTunnelApiExample {
- public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
String configFile = getTestConfigFile("/examples/fake_to_console.conf");
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setConfigFile(configFile);
flinkCommandArgs.setCheckConfig(false);
flinkCommandArgs.setVariables(null);
+ flinkCommandArgs.setApiType(ApiType.SEATUNNEL_API);
Command<FlinkCommandArgs> flinkCommand =
new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index 5469ec73..aa121a90 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.example.spark;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
@@ -30,7 +31,7 @@ import java.nio.file.Paths;
public class LocalSparkExample {
- public static void main(String[] args) throws URISyntaxException, FileNotFoundException {
+ public static void main(String[] args) throws URISyntaxException, FileNotFoundException, CommandException {
String configFile = getTestConfigFile("/examples/spark.batch.conf");
SparkCommandArgs sparkArgs = new SparkCommandArgs();
sparkArgs.setConfigFile(configFile);
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 354de014..bfdb2db4 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.types.Row;
@@ -41,6 +42,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
public void write(InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context) throws IOException {
if (element instanceof Row) {
sinkWriter.write(rowSerialization.deserialize((Row) element));
+ } else if (element instanceof WrappedRow) {
+ sinkWriter.write(rowSerialization.deserialize(((WrappedRow) element).getRow()));
} else {
throw new InvalidClassException("only support Flink Row at now, the element Class is " + element.getClass());
}