You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/16 08:29:36 UTC
[incubator-seatunnel] branch api-draft updated: Add SeaTunnelPluginDiscovery to load seatunnel new api plugin (#1889)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 1844a7a1 Add SeaTunnelPluginDiscovery to load seatunnel new api plugin (#1889)
1844a7a1 is described below
commit 1844a7a175066f1a7fb5052a622c791968dc1844
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon May 16 16:29:30 2022 +0800
Add SeaTunnelPluginDiscovery to load seatunnel new api plugin (#1889)
* Add SeaTunnelPluginDiscovery to load seatunnel new api plugin
---
.../PluginIdentifierInterface.java} | 9 ++-
.../api/serialization/DefaultSerializer.java | 2 +-
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 10 +--
.../seatunnel/api/source/SeaTunnelSource.java | 3 +-
.../seatunnel/api/table/factory/Factory.java | 1 +
seatunnel-common/pom.xml | 5 ++
.../seatunnel/common/utils/SerializationUtils.java | 8 +--
seatunnel-connectors/plugin-mapping.properties | 5 ++
.../seatunnel/console/sink/ConsoleSink.java | 5 ++
.../seatunnel/fake/source/FakeSource.java | 5 ++
.../command/SeaTunnelApiTaskExecuteCommand.java | 74 +++++++++++++---------
seatunnel-plugin-discovery/pom.xml | 6 ++
.../plugin/discovery/AbstractPluginDiscovery.java | 19 ++++--
.../SeaTunnelFlinkTransformPluginDiscovery.java | 25 ++++----
.../seatunnel/SeaTunnelSinkPluginDiscovery.java | 26 ++++----
.../seatunnel/SeaTunnelSourcePluginDiscovery.java | 25 ++++----
.../SeaTunnelSparkTransformPluginDiscovery.java | 25 ++++----
17 files changed, 154 insertions(+), 99 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifierInterface.java
similarity index 88%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifierInterface.java
index cd35609b..edc56d50 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifierInterface.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.common;
/**
- * This is the SPI interface.
+ * todo: unified with Plugin
*/
-public interface Factory {
-
+public interface PluginIdentifierInterface {
/**
* Returns a unique identifier among same factory interfaces.
*
@@ -29,5 +28,5 @@ public interface Factory {
* kafka}). If multiple factories exist for different versions, a version should be appended
* using "-" (e.g. {@code elasticsearch-7}).
*/
- String factoryIdentifier();
+ String getPluginName();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index bf2c6358..1705d6f7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.io.InvalidClassException;
import java.io.Serializable;
-public class DefaultSerializer<T> implements Serializer<T> {
+public class DefaultSerializer<T extends Serializable> implements Serializer<T> {
@Override
public byte[] serialize(T obj) throws IOException {
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 00427e63..4c71f3b5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.api.sink;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.serialization.Serializer;
import java.io.IOException;
@@ -38,7 +38,7 @@ import java.util.Optional;
* @param <AggregatedCommitInfoT> The aggregated commit message class, combine by {@link CommitInfoT}.
* {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
*/
-public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface {
/**
* This method will be called to creat {@link SinkWriter}
@@ -60,7 +60,7 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
* @return Serializer of {@link StateT}
*/
default Optional<Serializer<StateT>> getWriterStateSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ return Optional.empty();
}
/**
@@ -79,7 +79,7 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
* @return Serializer of {@link CommitInfoT}
*/
default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ return Optional.empty();
}
/**
@@ -98,6 +98,6 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
* @return Serializer of {@link AggregatedCommitInfoT}
*/
default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ return Optional.empty();
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 7244891a..c1f145bc 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.source;
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.serialization.Serializer;
import java.io.Serializable;
@@ -29,7 +30,7 @@ import java.io.Serializable;
* @param <SplitT> The type of splits handled by the source.
* @param <StateT> The type of checkpoint states.
*/
-public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable {
+public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable, PluginIdentifierInterface {
/**
* Get the boundedness of this source.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index cd35609b..098d3d29 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.factory;
/**
+ * todo: use PluginIdentifier.
* This is the SPI interface.
*/
public interface Factory {
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index 99e1bb19..24e7dd32 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -54,6 +54,11 @@
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index a1d0aa96..43dead16 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -26,24 +26,24 @@ public class SerializationUtils {
public static String objectToString(Serializable obj) {
if (obj != null) {
- return Base64.encodeBase64String(SerializationUtils.serialize(obj));
+ return Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
}
return null;
}
public static <T extends Serializable> T stringToObject(String str) {
if (StringUtils.isNotEmpty(str)) {
- return SerializationUtils.deserialize(Base64.decodeBase64(str));
+ return org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
}
return null;
}
public static <T extends Serializable> byte[] serialize(T obj) {
- return SerializationUtils.serialize(obj);
+ return org.apache.commons.lang3.SerializationUtils.serialize(obj);
}
public static <T extends Serializable> T deserialize(byte[] bytes) {
- return SerializationUtils.deserialize(bytes);
+ return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
}
}
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index 58536d02..92527139 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -82,3 +82,8 @@ spark.sink.MongoDB = seatunnel-connector-spark-mongodb
spark.sink.Phoenix = seatunnel-connector-spark-phoenix
spark.sink.Redis = seatunnel-connector-spark-redis
spark.sink.TiDB = seatunnel-connector-spark-tidb
+
+# SeaTunnel new API
+
+seatunnel.source.FakeSource = seatunnel-connectors-seatunnel-fake
+seatunnel.sink.Console = seatunnel-connectors-seatunnel-console
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index ea7f69cb..b3cb26ab 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -39,4 +39,9 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
SinkWriter.Context context, List<ConsoleState> states) {
return restoreWriter(context, states);
}
+
+ @Override
+ public String getPluginName() {
+ return "Console";
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 9c12e4da..074816b6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -63,4 +63,9 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
public Serializer<FakeState> getEnumeratorStateSerializer() {
return new DefaultSerializer<>();
}
+
+ @Override
+ public String getPluginName() {
+ return "FakeSource";
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index e767df5f..e563a3d9 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,9 +17,6 @@
package org.apache.seatunnel.core.flink.command;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.command.Command;
@@ -29,24 +26,30 @@ import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.common.collect.Lists;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URL;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.ServiceLoader;
+import java.util.List;
/**
+ * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
* This command is used to execute the Flink job by SeaTunnel new API.
*/
public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
@@ -65,13 +68,17 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
+ SeaTunnelParallelSource source = getSource(config);
// todo: add basic type
- SeaTunnelParallelSource source = getSource();
- Sink<WrappedRow, Object, Object, Object> flinkSink = getSink();
- // execute the flink job
+ Sink<WrappedRow, Object, Object, Object> flinkSink = getSink(config);
+
FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+ registerPlugins(flinkEnvironment);
+
StreamExecutionEnvironment streamExecutionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+ // support multiple sources/sink
DataStreamSource<WrappedRow> dataStream = streamExecutionEnvironment.addSource(source);
+ // todo: add transform
dataStream.sinkTo(flinkSink);
try {
streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
@@ -80,34 +87,43 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
}
}
- private SeaTunnelParallelSource getSource() {
- return new SeaTunnelParallelSource(loadSourcePlugin());
+ private SeaTunnelParallelSource getSource(Config config) {
+ PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
+ // todo: use FactoryUtils to load the plugin
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+ return new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier));
}
- private Sink<WrappedRow, Object, Object, Object> getSink() {
- SeaTunnelSink<SeaTunnelRow, Object, Object, Object> sink = loadSinkPlugin();
+ private Sink<WrappedRow, Object, Object, Object> getSink(Config config) {
+ PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
- return flinkSinkConverter.convert(sink, Collections.emptyMap());
+ return flinkSinkConverter.convert(sinkPluginDiscovery.getPluginInstance(pluginIdentifier), Collections.emptyMap());
}
- private <T, SplitT extends SourceSplit, StateT> SeaTunnelSource<T, SplitT, StateT> loadSourcePlugin() {
- // todo: use FactoryUtils to load the plugin
- ServiceLoader<SeaTunnelSource> serviceLoader = ServiceLoader.load(SeaTunnelSource.class);
- Iterator<SeaTunnelSource> iterator = serviceLoader.iterator();
- if (iterator.hasNext()) {
- return iterator.next();
- }
- throw new IllegalArgumentException("Cannot find the plugin.");
+ private List<URL> getSourPluginJars(PluginIdentifier pluginIdentifier) {
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+ return sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
}
- private <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> loadSinkPlugin() {
- // todo: use FactoryUtils to load the plugin
- ServiceLoader<SeaTunnelSink> serviceLoader = ServiceLoader.load(SeaTunnelSink.class);
- Iterator<SeaTunnelSink> iterator = serviceLoader.iterator();
- if (iterator.hasNext()) {
- return iterator.next();
- }
- throw new IllegalArgumentException("Cannot find the plugin.");
+ private List<URL> getSinkPluginJars(PluginIdentifier pluginIdentifier) {
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+ return sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+ }
+
+ private PluginIdentifier getSourcePluginIdentifier() {
+ return PluginIdentifier.of("seatunnel", "source", "FakeSource");
+ }
+
+ private PluginIdentifier getSinkPluginIdentifier() {
+ return PluginIdentifier.of("seatunnel", "sink", "Console");
+ }
+
+ private void registerPlugins(FlinkEnvironment flinkEnvironment) {
+ List<URL> pluginJars = new ArrayList<>();
+ pluginJars.addAll(getSourPluginJars(getSourcePluginIdentifier()));
+ pluginJars.addAll(getSinkPluginJars(getSinkPluginIdentifier()));
+ flinkEnvironment.registerPlugin(pluginJars);
}
private FlinkEnvironment getFlinkEnvironment(Config config) {
diff --git a/seatunnel-plugin-discovery/pom.xml b/seatunnel-plugin-discovery/pom.xml
index 378e0670..e803b0e3 100644
--- a/seatunnel-plugin-discovery/pom.xml
+++ b/seatunnel-plugin-discovery/pom.xml
@@ -26,6 +26,12 @@
<artifactId>seatunnel-plugin-discovery</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-flink</artifactId>
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 8c90dbab..cc8427d9 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.plugin.discovery;
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.Common;
@@ -159,10 +160,20 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
}
ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
for (T t : serviceLoader) {
- // todo: add plugin identifier interface to support new api interface.
- Plugin<?> pluginInstance = (Plugin<?>) t;
- if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
- return Optional.of((T) pluginInstance);
+ if (t instanceof Plugin) {
+ // old api
+ Plugin<?> pluginInstance = (Plugin<?>) t;
+ if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+ return Optional.of((T) pluginInstance);
+ }
+ } else if (t instanceof PluginIdentifierInterface) {
+ // new api
+ PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
+ if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+ return Optional.of((T) pluginIdentifierInstance);
+ }
+ } else {
+ throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
}
}
return Optional.empty();
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java
similarity index 59%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java
index cd35609b..2b5620f4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java
@@ -15,19 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
+
+import org.apache.seatunnel.flink.BaseFlinkTransform;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
/**
- * This is the SPI interface.
+ * Discovery for the SeaTunnel Flink transform.
*/
-public interface Factory {
+public class SeaTunnelFlinkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseFlinkTransform> {
+
+ public SeaTunnelFlinkTransformPluginDiscovery() {
+ super("seatunnel");
+ }
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+ @Override
+ protected Class<BaseFlinkTransform> getPluginBaseClass() {
+ return BaseFlinkTransform.class;
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
similarity index 62%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index cd35609b..d3286c54 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
-/**
- * This is the SPI interface.
- */
-public interface Factory {
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class SeaTunnelSinkPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelSink> {
+
+ public SeaTunnelSinkPluginDiscovery() {
+ super("seatunnel");
+ }
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+ @Override
+ protected Class<SeaTunnelSink> getPluginBaseClass() {
+ return SeaTunnelSink.class;
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
similarity index 62%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index cd35609b..8618e037 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
-/**
- * This is the SPI interface.
- */
-public interface Factory {
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class SeaTunnelSourcePluginDiscovery extends AbstractPluginDiscovery<SeaTunnelSource> {
+ public SeaTunnelSourcePluginDiscovery() {
+ super("seatunnel");
+ }
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+ @Override
+ protected Class<SeaTunnelSource> getPluginBaseClass() {
+ return SeaTunnelSource.class;
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
similarity index 61%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
index cd35609b..bfc5358e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
-/**
- * This is the SPI interface.
- */
-public interface Factory {
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+
+public class SeaTunnelSparkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseSparkTransform> {
+ public SeaTunnelSparkTransformPluginDiscovery() {
+ super("seatunnel");
+ }
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+ @Override
+ protected Class<BaseSparkTransform> getPluginBaseClass() {
+ return BaseSparkTransform.class;
+ }
}