You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/18 12:58:21 UTC

[incubator-seatunnel] branch api-draft updated: Add SeaTunnel plugin lifecycle (#1914)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new c0857382 Add SeaTunnel plugin lifecycle (#1914)
c0857382 is described below

commit c085738265bfc17cc7bc7b4e15a39a3e7f06d95a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 18 20:58:14 2022 +0800

    Add SeaTunnel plugin lifecycle (#1914)
---
 .../api/common/SeaTunnelPluginLifeCycle.java       | 35 ++++++++++++++++++++++
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   |  4 ++-
 .../seatunnel/api/source/SeaTunnelSource.java      |  4 ++-
 .../seatunnel/console/sink/ConsoleSink.java        |  8 +++++
 .../seatunnel/fake/source/FakeSource.java          |  9 ++++++
 .../execution/AbstractPluginExecuteProcessor.java  |  2 ++
 .../core/flink/execution/SinkExecuteProcessor.java | 12 ++++----
 .../flink/execution/SourceExecuteProcessor.java    |  9 +++---
 .../flink/execution/TransformExecuteProcessor.java |  8 ++---
 9 files changed, 76 insertions(+), 15 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
new file mode 100644
index 00000000..e0309e75
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+/**
+ * This interface is the life cycle of a plugin, after a plugin created,
+ * will execute prepare method to do some initialize operation.
+ */
+public interface SeaTunnelPluginLifeCycle {
+
+    /**
+     * Use the pluginConfig to do some initialize operation.
+     *
+     * @param pluginConfig plugin config.
+     */
+    void prepare(Config pluginConfig);
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 0033e3d9..1178c604 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.sink;
 
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
+import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 
@@ -39,7 +40,8 @@ import java.util.Optional;
  * @param <AggregatedCommitInfoT> The aggregated commit message class, combine by {@link CommitInfoT}.
  *                                {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
  */
-public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface {
+public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
 
     /**
      * Set the row type info of sink row data. This method will be automatically called by translation.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index cf48428a..7d1522f0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.source;
 
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
+import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 
@@ -31,7 +32,8 @@ import java.io.Serializable;
  * @param <SplitT> The type of splits handled by the source.
  * @param <StateT> The type of checkpoint states.
  */
-public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable, PluginIdentifierInterface {
+public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
+    extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
 
     /**
      * Get the boundedness of this source.
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 5ae9aee3..f850c746 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import com.google.auto.service.AutoService;
 
 import java.util.List;
@@ -30,6 +32,7 @@ import java.util.List;
 @AutoService(SeaTunnelSink.class)
 public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
 
+    private Config pluginConfig;
     private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
 
     @Override
@@ -52,4 +55,9 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
     public String getPluginName() {
         return "Console";
     }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0e687517..03e606ed 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -29,11 +29,15 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import com.google.auto.service.AutoService;
 
 @AutoService(SeaTunnelSource.class)
 public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
 
+    private Config pluginConfig;
+
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
@@ -78,4 +82,9 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
     public String getPluginName() {
         return "FakeSource";
     }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
index ef5a05b3..4966355f 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
@@ -38,6 +38,8 @@ public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecute
     protected final FlinkEnvironment flinkEnvironment;
     protected final List<? extends Config> pluginConfigs;
     protected final List<T> plugins;
+    protected static final String ENGINE_TYPE = "seatunnel";
+    protected static final String PLUGIN_NAME = "plugin_name";
 
     protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                              List<? extends Config> pluginConfigs) {
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
index f1621533..e479f33a 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
@@ -45,6 +45,8 @@ import scala.Serializable;
 
 public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
 
+    private static final String PLUGIN_TYPE = "sink";
+
     protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                    List<? extends Config> pluginConfigs) {
         super(flinkEnvironment, pluginConfigs);
@@ -55,12 +57,12 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
         List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
-            PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                "seatunnel",
-                "sink",
-                sinkConfig.getString("plugin_name"));
+            PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
             pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            return (SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>) sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+                sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+            seaTunnelSink.prepare(sinkConfig);
+            return seaTunnelSink;
         }).collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
         return sinks;
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
index 2e264af4..ae96d027 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.flink.execution;
 
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -61,11 +62,11 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
         List<URL> jars = new ArrayList<>();
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                "seatunnel",
-                "source",
-                sourceConfig.getString("plugin_name"));
+                "seatunnel", "source", sourceConfig.getString("plugin_name"));
             jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            sources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+            SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+            seaTunnelSource.prepare(sourceConfig);
+            sources.add(new SeaTunnelParallelSource(seaTunnelSource));
         }
         flinkEnvironment.registerPlugin(jars);
         return sources;
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
index 508dec9a..1a12e98d 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
@@ -34,6 +34,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+
+    private static final String PLUGIN_TYPE = "transform";
+
     protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                         List<? extends Config> pluginConfigs) {
         super(flinkEnvironment, pluginConfigs);
@@ -45,10 +48,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Fl
         List<URL> pluginJars = new ArrayList<>();
         List<FlinkStreamTransform> transforms = pluginConfigs.stream()
             .map(transformConfig -> {
-                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                    "seatunnel",
-                    "transform",
-                    transformConfig.getString("plugin_name"));
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
                 pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
                 FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(transformConfig);