You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/11 09:21:59 UTC
[incubator-seatunnel] branch api-draft updated: Add SeatunnelTaskExecuteCommand (#1847)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 8ff90fdc Add SeatunnelTaskExecuteCommand (#1847)
8ff90fdc is described below
commit 8ff90fdcfbadb5e41e870c24f942b9be102c28a5
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 11 17:21:53 2022 +0800
Add SeatunnelTaskExecuteCommand (#1847)
---
docs/en/images/seatunnel_starter.png | Bin 0 -> 423840 bytes
.../api/sink/{Sink.java => SeaTunnelSink.java} | 2 +-
.../source/{Source.java => SeaTunnelSource.java} | 2 +-
.../seatunnel/api/table/connector/TableSink.java | 4 +-
.../seatunnel/api/table/connector/TableSource.java | 4 +-
.../seatunnel/api/table/factory/FactoryUtil.java | 12 ++---
.../apache/seatunnel/apis/base/plugin/Plugin.java | 14 +++---
seatunnel-core/README.md | 8 +++
.../flink/command/SeaTunnelTaskExecuteCommand.java | 56 +++++++++++++++++++++
.../seatunnel/translation/sink/SinkConverter.java | 6 ++-
.../translation/source/ParallelSource.java | 12 +++--
.../translation/flink/sink/FlinkSink.java | 5 +-
.../translation/flink/sink/FlinkSinkConverter.java | 14 +++---
.../flink/source/SeaTunnelParallelSource.java | 7 +--
14 files changed, 109 insertions(+), 37 deletions(-)
diff --git a/docs/en/images/seatunnel_starter.png b/docs/en/images/seatunnel_starter.png
new file mode 100644
index 00000000..4d970089
Binary files /dev/null and b/docs/en/images/seatunnel_starter.png differ
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
similarity index 95%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index cd7ba6ee..cb6a4247 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
import java.util.List;
import java.util.Optional;
-public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
similarity index 95%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 7d77f0d7..729753fc 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
* @param <T> The type of records produced by the source.
* @param <SplitT> The type of splits handled by the source.
*/
-public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable {
+public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable {
/**
* Get the boundedness of this source.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index fe0be552..9cd8efbe 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.api.table.connector;
-import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
public interface TableSink {
- Sink<?, ?, ?, ?> createSink();
+ SeaTunnelSink<?, ?, ?, ?> createSink();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index 2267ab1f..7727735a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.api.table.connector;
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
public interface TableSource {
- Source<?, ?, ?> createSource();
+ SeaTunnelSource<?, ?, ?> createSource();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 82e84333..b0677676 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.api.table.factory;
-import org.apache.seatunnel.api.sink.Sink;
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
@@ -41,7 +41,7 @@ public final class FactoryUtil {
private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
- public static List<Source> createAndPrepareSource(
+ public static List<SeaTunnelSource> createAndPrepareSource(
List<CatalogTable> multipleTables,
Map<String, String> options,
ClassLoader classLoader,
@@ -50,7 +50,7 @@ public final class FactoryUtil {
try {
final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
- List<Source> sources = new ArrayList<>(multipleTables.size());
+ List<SeaTunnelSource> sources = new ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader);
SupportMultipleTable multipleTableSourceFactory = (SupportMultipleTable) factory;
@@ -58,7 +58,7 @@ public final class FactoryUtil {
SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context);
TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
// TODO: handle reading metadata
- Source<?, ?, ?> source = multipleTableSource.createSource();
+ SeaTunnelSource<?, ?, ?> source = multipleTableSource.createSource();
sources.add(source);
}
return sources;
@@ -70,7 +70,7 @@ public final class FactoryUtil {
}
}
- public static List<Sink> createAndPrepareSink() {
+ public static List<SeaTunnelSink> createAndPrepareSink() {
return null;
}
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
index 63370c44..6cf048c5 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
@@ -28,13 +28,13 @@ import java.io.Serializable;
* A base interface indicates belonging to SeaTunnel.
* Plugin will be used as follows:
* <pre>{@code
- * Plugin<?> plugin = new PluginA<>();
- * plugin.setConfig(Config);
- * CheckResult checkResult = plugin.checkConfig();
- * if (checkResult.getSuccess()) {
- * plugin.prepare();
- * // plugin execute code
- * plugin.close();
+ * try(Plugin<?> plugin = new PluginA<>()) {
+ * plugin.setConfig(Config);
+ * CheckResult checkResult = plugin.checkConfig();
+ * if (checkResult.getSuccess()) {
+ * plugin.prepare();
+ * // plugin execute code
+ * }
* }
*
* }</pre>
diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md
new file mode 100644
index 00000000..c4b48e4c
--- /dev/null
+++ b/seatunnel-core/README.md
@@ -0,0 +1,8 @@
+# Introduction
+
+This module is the seatunnel job entrypoint. Seatunnel jobs are started by the below process.
+![seatunnel-workflow.svg](../docs/en/images/seatunnel_starter.png)
+
+- seatunnel-core-flink: The flink job starter.
+- seatunnel-core-flink-sql: The flink sql job starter.
+- seatunnel-core-spark: The spark job starter.
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
new file mode 100644
index 00000000..c58884f5
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * Used to execute a SeaTunnelTask. This command is used to execute the Flink job by new API.
+ */
+public class SeaTunnelTaskExecuteCommand implements Command<FlinkCommandArgs> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+
+ private final FlinkCommandArgs flinkCommandArgs;
+
+ public SeaTunnelTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ this.flinkCommandArgs = flinkCommandArgs;
+ }
+
+ @Override
+ public void execute() {
+ EngineType engine = flinkCommandArgs.getEngineType();
+ Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
+
+ Config config = new ConfigBuilder<>(configFile, engine).getConfig();
+ // initialize the new plugin.
+ // translate plugin to flink source/sink
+ // execute the flink job
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
index 7b2e6987..574f0ec6 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
@@ -17,14 +17,16 @@
package org.apache.seatunnel.translation.sink;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+
import java.util.Map;
public interface SinkConverter<SeaTunnelSinkT, TargetSinkT> {
/**
- * Convert SeaTunnel {@link org.apache.seatunnel.api.sink.Sink} to target sink.
+ * Convert SeaTunnel {@link SeaTunnelSink} to target sink.
*
- * @param sink1 SeaTunnel {@link org.apache.seatunnel.api.sink.Sink}.
+ * @param sink1 SeaTunnel {@link SeaTunnelSink}.
* @param configuration sink configuration.
* @return target sink.
*/
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 3a072902..af67eb66 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.translation.source;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorService;
public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements AutoCloseable, CheckpointListener {
- protected final Source<T, SplitT, StateT> source;
+ protected final SeaTunnelSource<T, SplitT, StateT> source;
protected final ParallelEnumeratorContext<SplitT> parallelEnumeratorContext;
protected final ParallelReaderContext readerContext;
protected final Integer subtaskId;
@@ -54,7 +54,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements Au
*/
private volatile boolean running = true;
- public ParallelSource(Source<T, SplitT, StateT> source,
+ public ParallelSource(SeaTunnelSource<T, SplitT, StateT> source,
List<byte[]> restoredState,
int parallelism,
int subtaskId) {
@@ -110,8 +110,10 @@ public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements Au
// set ourselves as not running;
// this would let the main discovery loop escape as soon as possible
running = false;
- splitEnumerator.close();
- reader.close();
+ try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator;
+ SourceReader<T, SplitT> closedReader = reader) {
+ // just close the resources
+ }
}
// --------------------------------------------------------------------------------------------
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 994163c6..5fb88060 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
@@ -35,10 +36,10 @@ import java.util.Optional;
public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommT, WriterStateT, GlobalCommT> {
- private final org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink;
+ private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
private final Map<String, String> configuration;
- FlinkSink(org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink,
+ FlinkSink(SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink,
Map<String, String> configuration) {
this.sink = sink;
this.configuration = configuration;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index c5ed3168..2456b9fe 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -17,21 +17,23 @@
package org.apache.seatunnel.translation.flink.sink;
-import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.translation.sink.SinkConverter;
+import org.apache.flink.api.connector.sink.Sink;
+
import java.util.Map;
public class FlinkSinkConverter<SeaTunnelRowT, FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>
implements SinkConverter<
- Sink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
- org.apache.flink.api.connector.sink.Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
+ SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
+ Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
@Override
@SuppressWarnings("unchecked")
- public org.apache.flink.api.connector.sink.Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
- Sink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
- return (org.apache.flink.api.connector.sink.Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) new FlinkSink<>(sink, configuration);
+ public Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
+ SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
+ return (Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) new FlinkSink<>(sink, configuration);
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index a51fbc7a..48fe0d1c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.translation.flink.source;
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.serialization.KryoTypeInfo;
import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
@@ -48,7 +48,7 @@ public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedR
private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelParallelSource.class);
protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
- protected final Source<SeaTunnelRow, ?, ?> source;
+ protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
protected transient ListState<byte[]> sourceState;
@@ -59,7 +59,7 @@ public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedR
*/
private volatile boolean running = true;
- public SeaTunnelParallelSource(Source<SeaTunnelRow, ?, ?> source) {
+ public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
// TODO: Make sure the source is uncoordinated.
this.source = source;
}
@@ -101,6 +101,7 @@ public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedR
@Override
public TypeInformation<WrappedRow> getProducedType() {
+ // todo: add type transformation
return new KryoTypeInfo<>(WrappedRow.class);
}