You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/16 08:29:36 UTC

[incubator-seatunnel] branch api-draft updated: Add SeaTunnelPluginDiscovery to load seatunnel new api plugin (#1889)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 1844a7a1 Add SeaTunnelPluginDiscovery to load seatunnel new api plugin (#1889)
1844a7a1 is described below

commit 1844a7a175066f1a7fb5052a622c791968dc1844
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon May 16 16:29:30 2022 +0800

    Add SeaTunnelPluginDiscovery to load seatunnel new api plugin (#1889)
    
    * Add SeaTunnelPluginDiscovery to load seatunnel new api plugin
---
 .../PluginIdentifierInterface.java}                |  9 ++-
 .../api/serialization/DefaultSerializer.java       |  2 +-
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   | 10 +--
 .../seatunnel/api/source/SeaTunnelSource.java      |  3 +-
 .../seatunnel/api/table/factory/Factory.java       |  1 +
 seatunnel-common/pom.xml                           |  5 ++
 .../seatunnel/common/utils/SerializationUtils.java |  8 +--
 seatunnel-connectors/plugin-mapping.properties     |  5 ++
 .../seatunnel/console/sink/ConsoleSink.java        |  5 ++
 .../seatunnel/fake/source/FakeSource.java          |  5 ++
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 74 +++++++++++++---------
 seatunnel-plugin-discovery/pom.xml                 |  6 ++
 .../plugin/discovery/AbstractPluginDiscovery.java  | 19 ++++--
 .../SeaTunnelFlinkTransformPluginDiscovery.java    | 25 ++++----
 .../seatunnel/SeaTunnelSinkPluginDiscovery.java    | 26 ++++----
 .../seatunnel/SeaTunnelSourcePluginDiscovery.java  | 25 ++++----
 .../SeaTunnelSparkTransformPluginDiscovery.java    | 25 ++++----
 17 files changed, 154 insertions(+), 99 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifierInterface.java
similarity index 88%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifierInterface.java
index cd35609b..edc56d50 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifierInterface.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.common;
 
 /**
- * This is the SPI interface.
+ * todo: unified with Plugin
  */
-public interface Factory {
-
+public interface PluginIdentifierInterface {
     /**
      * Returns a unique identifier among same factory interfaces.
      *
@@ -29,5 +28,5 @@ public interface Factory {
      * kafka}). If multiple factories exist for different versions, a version should be appended
      * using "-" (e.g. {@code elasticsearch-7}).
      */
-    String factoryIdentifier();
+    String getPluginName();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index bf2c6358..1705d6f7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.io.InvalidClassException;
 import java.io.Serializable;
 
-public class DefaultSerializer<T> implements Serializer<T> {
+public class DefaultSerializer<T extends Serializable> implements Serializer<T> {
 
     @Override
     public byte[] serialize(T obj) throws IOException {
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 00427e63..4c71f3b5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.api.sink;
 
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.serialization.Serializer;
 
 import java.io.IOException;
@@ -38,7 +38,7 @@ import java.util.Optional;
  * @param <AggregatedCommitInfoT> The aggregated commit message class, combine by {@link CommitInfoT}.
  *                                {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
  */
-public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface {
 
     /**
      * This method will be called to creat {@link SinkWriter}
@@ -60,7 +60,7 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
      * @return Serializer of {@link StateT}
      */
     default Optional<Serializer<StateT>> getWriterStateSerializer() {
-        return Optional.of(new DefaultSerializer<>());
+        return Optional.empty();
     }
 
     /**
@@ -79,7 +79,7 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
      * @return Serializer of {@link CommitInfoT}
      */
     default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
+        return Optional.empty();
     }
 
     /**
@@ -98,6 +98,6 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
      * @return Serializer of {@link AggregatedCommitInfoT}
      */
     default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
+        return Optional.empty();
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 7244891a..c1f145bc 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.source;
 
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.serialization.Serializer;
 
 import java.io.Serializable;
@@ -29,7 +30,7 @@ import java.io.Serializable;
  * @param <SplitT> The type of splits handled by the source.
  * @param <StateT> The type of checkpoint states.
  */
-public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable {
+public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> extends Serializable, PluginIdentifierInterface {
 
     /**
      * Get the boundedness of this source.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index cd35609b..098d3d29 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.table.factory;
 
 /**
+ * todo: use PluginIdentifier.
  * This is the SPI interface.
  */
 public interface Factory {
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index 99e1bb19..24e7dd32 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -54,6 +54,11 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index a1d0aa96..43dead16 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -26,24 +26,24 @@ public class SerializationUtils {
 
     public static String objectToString(Serializable obj) {
         if (obj != null) {
-            return Base64.encodeBase64String(SerializationUtils.serialize(obj));
+            return Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
         }
         return null;
     }
 
     public static <T extends Serializable> T stringToObject(String str) {
         if (StringUtils.isNotEmpty(str)) {
-            return SerializationUtils.deserialize(Base64.decodeBase64(str));
+            return org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
         }
         return null;
     }
 
     public static <T extends Serializable> byte[] serialize(T obj) {
-        return SerializationUtils.serialize(obj);
+        return org.apache.commons.lang3.SerializationUtils.serialize(obj);
     }
 
     public static <T extends Serializable> T deserialize(byte[] bytes) {
-        return SerializationUtils.deserialize(bytes);
+        return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
     }
 
 }
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index 58536d02..92527139 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -82,3 +82,8 @@ spark.sink.MongoDB = seatunnel-connector-spark-mongodb
 spark.sink.Phoenix = seatunnel-connector-spark-phoenix
 spark.sink.Redis = seatunnel-connector-spark-redis
 spark.sink.TiDB = seatunnel-connector-spark-tidb
+
+# SeaTunnel new API
+
+seatunnel.source.FakeSource = seatunnel-connectors-seatunnel-fake
+seatunnel.sink.Console = seatunnel-connectors-seatunnel-console
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index ea7f69cb..b3cb26ab 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -39,4 +39,9 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
         SinkWriter.Context context, List<ConsoleState> states) {
         return restoreWriter(context, states);
     }
+
+    @Override
+    public String getPluginName() {
+        return "Console";
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 9c12e4da..074816b6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -63,4 +63,9 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
     public Serializer<FakeState> getEnumeratorStateSerializer() {
         return new DefaultSerializer<>();
     }
+
+    @Override
+    public String getPluginName() {
+        return "FakeSource";
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index e767df5f..e563a3d9 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -17,9 +17,6 @@
 
 package org.apache.seatunnel.core.flink.command;
 
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.command.Command;
@@ -29,24 +26,30 @@ import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
 import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
 import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.common.collect.Lists;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URL;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.ServiceLoader;
+import java.util.List;
 
 /**
+ * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
  * This command is used to execute the Flink job by SeaTunnel new API.
  */
 public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
@@ -65,13 +68,17 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
         Config config = new ConfigBuilder(configFile).getConfig();
+        SeaTunnelParallelSource source = getSource(config);
         // todo: add basic type
-        SeaTunnelParallelSource source = getSource();
-        Sink<WrappedRow, Object, Object, Object> flinkSink = getSink();
-        // execute the flink job
+        Sink<WrappedRow, Object, Object, Object> flinkSink = getSink(config);
+
         FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+        registerPlugins(flinkEnvironment);
+
         StreamExecutionEnvironment streamExecutionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+        // support multiple sources/sink
         DataStreamSource<WrappedRow> dataStream = streamExecutionEnvironment.addSource(source);
+        // todo: add transform
         dataStream.sinkTo(flinkSink);
         try {
             streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
@@ -80,34 +87,43 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs>
         }
     }
 
-    private SeaTunnelParallelSource getSource() {
-        return new SeaTunnelParallelSource(loadSourcePlugin());
+    private SeaTunnelParallelSource getSource(Config config) {
+        PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
+        // todo: use FactoryUtils to load the plugin
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+        return new SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier));
     }
 
-    private Sink<WrappedRow, Object, Object, Object> getSink() {
-        SeaTunnelSink<SeaTunnelRow, Object, Object, Object> sink = loadSinkPlugin();
+    private Sink<WrappedRow, Object, Object, Object> getSink(Config config) {
+        PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
         FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
-        return flinkSinkConverter.convert(sink, Collections.emptyMap());
+        return flinkSinkConverter.convert(sinkPluginDiscovery.getPluginInstance(pluginIdentifier), Collections.emptyMap());
     }
 
-    private <T, SplitT extends SourceSplit, StateT> SeaTunnelSource<T, SplitT, StateT> loadSourcePlugin() {
-        // todo: use FactoryUtils to load the plugin
-        ServiceLoader<SeaTunnelSource> serviceLoader = ServiceLoader.load(SeaTunnelSource.class);
-        Iterator<SeaTunnelSource> iterator = serviceLoader.iterator();
-        if (iterator.hasNext()) {
-            return iterator.next();
-        }
-        throw new IllegalArgumentException("Cannot find the plugin.");
+    private List<URL> getSourPluginJars(PluginIdentifier pluginIdentifier) {
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+        return sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
     }
 
-    private <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> loadSinkPlugin() {
-        // todo: use FactoryUtils to load the plugin
-        ServiceLoader<SeaTunnelSink> serviceLoader = ServiceLoader.load(SeaTunnelSink.class);
-        Iterator<SeaTunnelSink> iterator = serviceLoader.iterator();
-        if (iterator.hasNext()) {
-            return iterator.next();
-        }
-        throw new IllegalArgumentException("Cannot find the plugin.");
+    private List<URL> getSinkPluginJars(PluginIdentifier pluginIdentifier) {
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+        return sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+    }
+
+    private PluginIdentifier getSourcePluginIdentifier() {
+        return PluginIdentifier.of("seatunnel", "source", "FakeSource");
+    }
+
+    private PluginIdentifier getSinkPluginIdentifier() {
+        return PluginIdentifier.of("seatunnel", "sink", "Console");
+    }
+
+    private void registerPlugins(FlinkEnvironment flinkEnvironment) {
+        List<URL> pluginJars = new ArrayList<>();
+        pluginJars.addAll(getSourPluginJars(getSourcePluginIdentifier()));
+        pluginJars.addAll(getSinkPluginJars(getSinkPluginIdentifier()));
+        flinkEnvironment.registerPlugin(pluginJars);
     }
 
     private FlinkEnvironment getFlinkEnvironment(Config config) {
diff --git a/seatunnel-plugin-discovery/pom.xml b/seatunnel-plugin-discovery/pom.xml
index 378e0670..e803b0e3 100644
--- a/seatunnel-plugin-discovery/pom.xml
+++ b/seatunnel-plugin-discovery/pom.xml
@@ -26,6 +26,12 @@
     <artifactId>seatunnel-plugin-discovery</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api-flink</artifactId>
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 8c90dbab..cc8427d9 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.plugin.discovery;
 
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.config.Common;
 
@@ -159,10 +160,20 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
         }
         ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
         for (T t : serviceLoader) {
-            // todo: add plugin identifier interface to support new api interface.
-            Plugin<?> pluginInstance = (Plugin<?>) t;
-            if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
-                return Optional.of((T) pluginInstance);
+            if (t instanceof Plugin) {
+                // old api
+                Plugin<?> pluginInstance = (Plugin<?>) t;
+                if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+                    return Optional.of((T) pluginInstance);
+                }
+            } else if (t instanceof PluginIdentifierInterface) {
+                // new api
+                PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
+                if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+                    return Optional.of((T) pluginIdentifierInstance);
+                }
+            } else {
+                throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
             }
         }
         return Optional.empty();
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java
similarity index 59%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java
index cd35609b..2b5620f4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java
@@ -15,19 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
+
+import org.apache.seatunnel.flink.BaseFlinkTransform;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 
 /**
- * This is the SPI interface.
+ * Discovery for the SeaTunnel Flink transform.
  */
-public interface Factory {
+public class SeaTunnelFlinkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseFlinkTransform> {
+
+    public SeaTunnelFlinkTransformPluginDiscovery() {
+        super("seatunnel");
+    }
 
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+    @Override
+    protected Class<BaseFlinkTransform> getPluginBaseClass() {
+        return BaseFlinkTransform.class;
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
similarity index 62%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index cd35609b..d3286c54 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
 
-/**
- * This is the SPI interface.
- */
-public interface Factory {
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class SeaTunnelSinkPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelSink> {
+
+    public SeaTunnelSinkPluginDiscovery() {
+        super("seatunnel");
+    }
 
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+    @Override
+    protected Class<SeaTunnelSink> getPluginBaseClass() {
+        return SeaTunnelSink.class;
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
similarity index 62%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index cd35609b..8618e037 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
 
-/**
- * This is the SPI interface.
- */
-public interface Factory {
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class SeaTunnelSourcePluginDiscovery extends AbstractPluginDiscovery<SeaTunnelSource> {
+    public SeaTunnelSourcePluginDiscovery() {
+        super("seatunnel");
+    }
 
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+    @Override
+    protected Class<SeaTunnelSource> getPluginBaseClass() {
+        return SeaTunnelSource.class;
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
similarity index 61%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
index cd35609b..bfc5358e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
 
-/**
- * This is the SPI interface.
- */
-public interface Factory {
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+
+public class SeaTunnelSparkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseSparkTransform> {
+    public SeaTunnelSparkTransformPluginDiscovery() {
+        super("seatunnel");
+    }
 
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+    @Override
+    protected Class<BaseSparkTransform> getPluginBaseClass() {
+        return BaseSparkTransform.class;
+    }
 }