You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/11 09:21:59 UTC

[incubator-seatunnel] branch api-draft updated: Add SeatunnelTaskExecuteCommand (#1847)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 8ff90fdc Add SeatunnelTaskExecuteCommand (#1847)
8ff90fdc is described below

commit 8ff90fdcfbadb5e41e870c24f942b9be102c28a5
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 11 17:21:53 2022 +0800

    Add SeatunnelTaskExecuteCommand (#1847)
---
 docs/en/images/seatunnel_starter.png               | Bin 0 -> 423840 bytes
 .../api/sink/{Sink.java => SeaTunnelSink.java}     |   2 +-
 .../source/{Source.java => SeaTunnelSource.java}   |   2 +-
 .../seatunnel/api/table/connector/TableSink.java   |   4 +-
 .../seatunnel/api/table/connector/TableSource.java |   4 +-
 .../seatunnel/api/table/factory/FactoryUtil.java   |  12 ++---
 .../apache/seatunnel/apis/base/plugin/Plugin.java  |  14 +++---
 seatunnel-core/README.md                           |   8 +++
 .../flink/command/SeaTunnelTaskExecuteCommand.java |  56 +++++++++++++++++++++
 .../seatunnel/translation/sink/SinkConverter.java  |   6 ++-
 .../translation/source/ParallelSource.java         |  12 +++--
 .../translation/flink/sink/FlinkSink.java          |   5 +-
 .../translation/flink/sink/FlinkSinkConverter.java |  14 +++---
 .../flink/source/SeaTunnelParallelSource.java      |   7 +--
 14 files changed, 109 insertions(+), 37 deletions(-)

diff --git a/docs/en/images/seatunnel_starter.png b/docs/en/images/seatunnel_starter.png
new file mode 100644
index 00000000..4d970089
Binary files /dev/null and b/docs/en/images/seatunnel_starter.png differ
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
similarity index 95%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index cd7ba6ee..cb6a4247 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 
-public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
 
     SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
similarity index 95%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 7d77f0d7..729753fc 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  * @param <T>      The type of records produced by the source.
  * @param <SplitT> The type of splits handled by the source.
  */
-public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable {
+public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable {
 
     /**
      * Get the boundedness of this source.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index fe0be552..9cd8efbe 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.api.table.connector;
 
-import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
 
 public interface TableSink {
 
-    Sink<?, ?, ?, ?> createSink();
+    SeaTunnelSink<?, ?, ?, ?> createSink();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index 2267ab1f..7727735a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.api.table.connector;
 
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 
 public interface TableSource {
 
-    Source<?, ?, ?> createSource();
+    SeaTunnelSource<?, ?, ?> createSource();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 82e84333..b0677676 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-import org.apache.seatunnel.api.sink.Sink;
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSource;
@@ -41,7 +41,7 @@ public final class FactoryUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
 
-    public static List<Source> createAndPrepareSource(
+    public static List<SeaTunnelSource> createAndPrepareSource(
             List<CatalogTable> multipleTables,
             Map<String, String> options,
             ClassLoader classLoader,
@@ -50,7 +50,7 @@ public final class FactoryUtil {
         try {
 
             final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
-            List<Source> sources = new ArrayList<>(multipleTables.size());
+            List<SeaTunnelSource> sources = new ArrayList<>(multipleTables.size());
             if (factory instanceof SupportMultipleTable) {
                 TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader);
                 SupportMultipleTable multipleTableSourceFactory = (SupportMultipleTable) factory;
@@ -58,7 +58,7 @@ public final class FactoryUtil {
                 SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context);
                 TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
                 // TODO: handle reading metadata
-                Source<?, ?, ?> source = multipleTableSource.createSource();
+                SeaTunnelSource<?, ?, ?> source = multipleTableSource.createSource();
                 sources.add(source);
             }
             return sources;
@@ -70,7 +70,7 @@ public final class FactoryUtil {
         }
     }
 
-    public static List<Sink> createAndPrepareSink() {
+    public static List<SeaTunnelSink> createAndPrepareSink() {
         return null;
     }
 
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
index 63370c44..6cf048c5 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
@@ -28,13 +28,13 @@ import java.io.Serializable;
  * A base interface indicates belonging to SeaTunnel.
  * Plugin will be used as follows:
  * <pre>{@code
- *      Plugin<?> plugin = new PluginA<>();
- *      plugin.setConfig(Config);
- *      CheckResult checkResult = plugin.checkConfig();
- *      if (checkResult.getSuccess()) {
- *         plugin.prepare();
- *         // plugin execute code
- *         plugin.close();
+ *      try(Plugin<?> plugin = new PluginA<>()) {
+ *          plugin.setConfig(Config);
+ *          CheckResult checkResult = plugin.checkConfig();
+ *          if (checkResult.getSuccess()) {
+ *              plugin.prepare();
+ *              // plugin execute code
+ *          }
  *      }
  *
  * }</pre>
diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md
new file mode 100644
index 00000000..c4b48e4c
--- /dev/null
+++ b/seatunnel-core/README.md
@@ -0,0 +1,8 @@
+# Introduction
+
+This module is the seatunnel job entrypoint. Seatunnel jobs are started by the below process.
+![seatunnel-workflow.svg](../docs/en/images/seatunnel_starter.png)
+
+- seatunnel-core-flink: The flink job starter.
+- seatunnel-core-flink-sql: The flink sql job starter.
+- seatunnel-core-spark: The spark job starter.
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
new file mode 100644
index 00000000..c58884f5
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * Used to execute a SeaTunnelTask. This command is used to execute the Flink job by new API.
+ */
+public class SeaTunnelTaskExecuteCommand implements Command<FlinkCommandArgs> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+
+    private final FlinkCommandArgs flinkCommandArgs;
+
+    public SeaTunnelTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+        this.flinkCommandArgs = flinkCommandArgs;
+    }
+
+    @Override
+    public void execute() {
+        EngineType engine = flinkCommandArgs.getEngineType();
+        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
+
+        Config config = new ConfigBuilder<>(configFile, engine).getConfig();
+        // initialize the new plugin.
+        // translate plugin to flink source/sink
+        // execute the flink job
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
index 7b2e6987..574f0ec6 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
@@ -17,14 +17,16 @@
 
 package org.apache.seatunnel.translation.sink;
 
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+
 import java.util.Map;
 
 public interface SinkConverter<SeaTunnelSinkT, TargetSinkT> {
 
     /**
-     * Convert SeaTunnel {@link org.apache.seatunnel.api.sink.Sink} to target sink.
+     * Convert SeaTunnel {@link SeaTunnelSink} to target sink.
      *
-     * @param sink1         SeaTunnel {@link org.apache.seatunnel.api.sink.Sink}.
+     * @param sink1         SeaTunnel {@link SeaTunnelSink}.
      * @param configuration sink configuration.
      * @return target sink.
      */
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 3a072902..af67eb66 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.translation.source;
 
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 
 public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements AutoCloseable, CheckpointListener {
 
-    protected final Source<T, SplitT, StateT> source;
+    protected final SeaTunnelSource<T, SplitT, StateT> source;
     protected final ParallelEnumeratorContext<SplitT> parallelEnumeratorContext;
     protected final ParallelReaderContext readerContext;
     protected final Integer subtaskId;
@@ -54,7 +54,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements Au
      */
     private volatile boolean running = true;
 
-    public ParallelSource(Source<T, SplitT, StateT> source,
+    public ParallelSource(SeaTunnelSource<T, SplitT, StateT> source,
                           List<byte[]> restoredState,
                           int parallelism,
                           int subtaskId) {
@@ -110,8 +110,10 @@ public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements Au
         // set ourselves as not running;
         // this would let the main discovery loop escape as soon as possible
         running = false;
-        splitEnumerator.close();
-        reader.close();
+        try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator;
+             SourceReader<T, SplitT> closedReader = reader) {
+            // just close the resources
+        }
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 994163c6..5fb88060 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 
@@ -35,10 +36,10 @@ import java.util.Optional;
 
 public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommT, WriterStateT, GlobalCommT> {
 
-    private final org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink;
+    private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
     private final Map<String, String> configuration;
 
-    FlinkSink(org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink,
+    FlinkSink(SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink,
               Map<String, String> configuration) {
         this.sink = sink;
         this.configuration = configuration;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index c5ed3168..2456b9fe 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -17,21 +17,23 @@
 
 package org.apache.seatunnel.translation.flink.sink;
 
-import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.translation.sink.SinkConverter;
 
+import org.apache.flink.api.connector.sink.Sink;
+
 import java.util.Map;
 
 public class FlinkSinkConverter<SeaTunnelRowT, FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>
     implements SinkConverter<
-    Sink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
-    org.apache.flink.api.connector.sink.Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
+    SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
+    Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public org.apache.flink.api.connector.sink.Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
-        Sink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
-        return (org.apache.flink.api.connector.sink.Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) new FlinkSink<>(sink, configuration);
+    public Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
+        SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
+        return (Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) new FlinkSink<>(sink, configuration);
 
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index a51fbc7a..48fe0d1c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.translation.flink.source;
 
-import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.flink.serialization.KryoTypeInfo;
 import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
@@ -48,7 +48,7 @@ public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedR
     private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelParallelSource.class);
     protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
 
-    protected final Source<SeaTunnelRow, ?, ?> source;
+    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
 
     protected transient ListState<byte[]> sourceState;
@@ -59,7 +59,7 @@ public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedR
      */
     private volatile boolean running = true;
 
-    public SeaTunnelParallelSource(Source<SeaTunnelRow, ?, ?> source) {
+    public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
         // TODO: Make sure the source is uncoordinated.
         this.source = source;
     }
@@ -101,6 +101,7 @@ public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedR
 
     @Override
     public TypeInformation<WrappedRow> getProducedType() {
+        // todo: add type transformation
         return new KryoTypeInfo<>(WrappedRow.class);
     }