You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/18 12:58:21 UTC
[incubator-seatunnel] branch api-draft updated: Add SeaTunnel plugin lifecycle (#1914)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new c0857382 Add SeaTunnel plugin lifecycle (#1914)
c0857382 is described below
commit c085738265bfc17cc7bc7b4e15a39a3e7f06d95a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 18 20:58:14 2022 +0800
Add SeaTunnel plugin lifecycle (#1914)
---
.../api/common/SeaTunnelPluginLifeCycle.java | 35 ++++++++++++++++++++++
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 4 ++-
.../seatunnel/api/source/SeaTunnelSource.java | 4 ++-
.../seatunnel/console/sink/ConsoleSink.java | 8 +++++
.../seatunnel/fake/source/FakeSource.java | 9 ++++++
.../execution/AbstractPluginExecuteProcessor.java | 2 ++
.../core/flink/execution/SinkExecuteProcessor.java | 12 ++++----
.../flink/execution/SourceExecuteProcessor.java | 9 +++---
.../flink/execution/TransformExecuteProcessor.java | 8 ++---
9 files changed, 76 insertions(+), 15 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
new file mode 100644
index 00000000..e0309e75
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+/**
+ * This interface is the life cycle of a plugin, after a plugin created,
+ * will execute prepare method to do some initialize operation.
+ */
+public interface SeaTunnelPluginLifeCycle {
+
+ /**
+ * Use the pluginConfig to do some initialize operation.
+ *
+ * @param pluginConfig plugin config.
+ */
+ void prepare(Config pluginConfig);
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 0033e3d9..1178c604 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.sink;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
+import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
@@ -39,7 +40,8 @@ import java.util.Optional;
* @param <AggregatedCommitInfoT> The aggregated commit message class, combine by {@link CommitInfoT}.
* {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
*/
-public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface {
+public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
/**
* Set the row type info of sink row data. This method will be automatically called by translation.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index cf48428a..7d1522f0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.source;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
+import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
@@ -31,7 +32,8 @@ import java.io.Serializable;
* @param <SplitT> The type of splits handled by the source.
* @param <StateT> The type of checkpoint states.
*/
-public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable, PluginIdentifierInterface {
+public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle {
/**
* Get the boundedness of this source.
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 5ae9aee3..f850c746 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import com.google.auto.service.AutoService;
import java.util.List;
@@ -30,6 +32,7 @@ import java.util.List;
@AutoService(SeaTunnelSink.class)
public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
+ private Config pluginConfig;
private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
@Override
@@ -52,4 +55,9 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
public String getPluginName() {
return "Console";
}
+
+ @Override
+ public void prepare(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0e687517..03e606ed 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -29,11 +29,15 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
+ private Config pluginConfig;
+
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
@@ -78,4 +82,9 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
public String getPluginName() {
return "FakeSource";
}
+
+ @Override
+ public void prepare(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
index ef5a05b3..4966355f 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/AbstractPluginExecuteProcessor.java
@@ -38,6 +38,8 @@ public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecute
protected final FlinkEnvironment flinkEnvironment;
protected final List<? extends Config> pluginConfigs;
protected final List<T> plugins;
+ protected static final String ENGINE_TYPE = "seatunnel";
+ protected static final String PLUGIN_NAME = "plugin_name";
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> pluginConfigs) {
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
index f1621533..e479f33a 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
@@ -45,6 +45,8 @@ import scala.Serializable;
public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
+ private static final String PLUGIN_TYPE = "sink";
+
protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> pluginConfigs) {
super(flinkEnvironment, pluginConfigs);
@@ -55,12 +57,12 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "sink",
- sinkConfig.getString("plugin_name"));
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- return (SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>) sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+ sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ seaTunnelSink.prepare(sinkConfig);
+ return seaTunnelSink;
}).collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
return sinks;
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
index 2e264af4..ae96d027 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SourceExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.flink.execution;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -61,11 +62,11 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
List<URL> jars = new ArrayList<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "source",
- sourceConfig.getString("plugin_name"));
+ "seatunnel", "source", sourceConfig.getString("plugin_name"));
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- sources.add(new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+ SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ seaTunnelSource.prepare(sourceConfig);
+ sources.add(new SeaTunnelParallelSource(seaTunnelSource));
}
flinkEnvironment.registerPlugin(jars);
return sources;
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
index 508dec9a..1a12e98d 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/TransformExecuteProcessor.java
@@ -34,6 +34,9 @@ import java.util.List;
import java.util.stream.Collectors;
public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+
+ private static final String PLUGIN_TYPE = "transform";
+
protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> pluginConfigs) {
super(flinkEnvironment, pluginConfigs);
@@ -45,10 +48,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Fl
List<URL> pluginJars = new ArrayList<>();
List<FlinkStreamTransform> transforms = pluginConfigs.stream()
.map(transformConfig -> {
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "transform",
- transformConfig.getString("plugin_name"));
+ PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
pluginInstance.setConfig(transformConfig);