You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/15 11:56:09 UTC

[incubator-seatunnel] branch api-draft updated: Add plugin discovery module (#1881)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 5f79f45d Add plugin discovery module (#1881)
5f79f45d is described below

commit 5f79f45d22bbedc4a0f95ca1f251a217a7b4a23a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun May 15 19:56:03 2022 +0800

    Add plugin discovery module (#1881)
    
    * Add plugin discovery module
    * Remove PluginFactory
---
 pom.xml                                            |   1 +
 .../org/apache/seatunnel/common/config/Common.java |   7 +-
 seatunnel-core/seatunnel-core-base/pom.xml         |   6 +
 ...nContext.java => AbstractExecutionContext.java} |  45 ++--
 .../seatunnel/core/base/config/ConfigBuilder.java  |   1 -
 .../core/base/config/ExecutionFactory.java         |   4 +-
 .../seatunnel/core/base/config/PluginFactory.java  | 241 ---------------------
 .../flink/command/FlinkApiTaskExecuteCommand.java  |   4 +-
 .../core/flink/config/FlinkApiConfigChecker.java   |  10 +-
 .../core/flink/config/FlinkExecutionContext.java   | 104 +++++++++
 .../apache/seatunnel/core/spark/SparkStarter.java  |   9 +-
 .../spark/command/SparkTaskExecuteCommand.java     |   4 +-
 .../core/spark/config/SparkApiConfigChecker.java   |  12 +-
 .../core/spark/config/SparkExecutionContext.java   | 103 +++++++++
 .../pom.xml                                        |  30 +--
 .../plugin/discovery/AbstractPluginDiscovery.java  | 170 +++++++++++++++
 .../plugin/discovery/PluginDiscovery.java          |  70 ++++++
 .../plugin/discovery/PluginIdentifier.java         |  89 ++++++++
 .../discovery/flink/FlinkSinkPluginDiscovery.java  |  33 +++
 .../flink/FlinkSourcePluginDiscovery.java          |  32 +++
 .../flink/FlinkTransformPluginDiscovery.java       |  42 ++++
 .../discovery/spark/SparkSinkPluginDiscovery.java  |  33 +++
 .../spark/SparkSourcePluginDiscovery.java          |  33 +++
 .../spark/SparkTransformPluginDiscovery.java       |  45 ++++
 24 files changed, 821 insertions(+), 307 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9a5382b8..893a8bd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
         <module>seatunnel-e2e</module>
         <module>seatunnel-api</module>
         <module>seatunnel-translation</module>
+        <module>seatunnel-plugin-discovery</module>
     </modules>
 
     <properties>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 5280250f..dd74c022 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -32,8 +32,13 @@ public class Common {
         throw new IllegalStateException("Utility class");
     }
 
+    /**
+     * Used to set the size when create a new collection(just to pass the checkstyle).
+     */
+    public static final int COLLECTION_SIZE = 16;
+
     private static final List<String> ALLOWED_MODES = Arrays.stream(DeployMode.values())
-            .map(DeployMode::getName).collect(Collectors.toList());
+        .map(DeployMode::getName).collect(Collectors.toList());
 
     private static Optional<String> MODE = Optional.empty();
 
diff --git a/seatunnel-core/seatunnel-core-base/pom.xml b/seatunnel-core/seatunnel-core-base/pom.xml
index 1248fb91..77bd13d3 100644
--- a/seatunnel-core/seatunnel-core-base/pom.xml
+++ b/seatunnel-core/seatunnel-core-base/pom.xml
@@ -50,6 +50,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-plugin-discovery</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>com.beust</groupId>
             <artifactId>jcommander</artifactId>
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
similarity index 59%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
index 7e91a368..43ef94c4 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
@@ -22,37 +22,35 @@ import org.apache.seatunnel.apis.base.api.BaseSource;
 import org.apache.seatunnel.apis.base.api.BaseTransform;
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import java.net.URL;
+import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The ExecutionContext contains all configuration needed to run the job.
  *
  * @param <ENVIRONMENT> environment type.
  */
-public class ExecutionContext<ENVIRONMENT extends RuntimeEnv> {
+public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv> {
 
     private final Config config;
     private final EngineType engine;
 
     private final ENVIRONMENT environment;
     private final JobMode jobMode;
-    private final List<BaseSource<ENVIRONMENT>> sources;
-    private final List<BaseTransform<ENVIRONMENT>> transforms;
-    private final List<BaseSink<ENVIRONMENT>> sinks;
 
-    public ExecutionContext(Config config, EngineType engine) {
+    public AbstractExecutionContext(Config config, EngineType engine) {
         this.config = config;
         this.engine = engine;
         this.environment = new EnvironmentFactory<ENVIRONMENT>(config, engine).getEnvironment();
         this.jobMode = environment.getJobMode();
-        PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config, engine);
-        this.environment.registerPlugin(pluginFactory.getPluginJarPaths());
-        this.sources = pluginFactory.createPlugins(PluginType.SOURCE);
-        this.transforms = pluginFactory.createPlugins(PluginType.TRANSFORM);
-        this.sinks = pluginFactory.createPlugins(PluginType.SINK);
     }
 
     public Config getRootConfig() {
@@ -71,15 +69,26 @@ public class ExecutionContext<ENVIRONMENT extends RuntimeEnv> {
         return jobMode;
     }
 
-    public List<BaseSource<ENVIRONMENT>> getSources() {
-        return sources;
-    }
+    public abstract List<BaseSource<ENVIRONMENT>> getSources();
 
-    public List<BaseTransform<ENVIRONMENT>> getTransforms() {
-        return transforms;
-    }
+    public abstract List<BaseTransform<ENVIRONMENT>> getTransforms();
+
+    public abstract List<BaseSink<ENVIRONMENT>> getSinks();
+
+    public abstract List<URL> getPluginJars();
 
-    public List<BaseSink<ENVIRONMENT>> getSinks() {
-        return sinks;
+    @SuppressWarnings("checkstyle:Indentation")
+    protected List<PluginIdentifier> getPluginIdentifiers(PluginType... pluginTypes) {
+        return Arrays.stream(pluginTypes).flatMap(new Function<PluginType, Stream<PluginIdentifier>>() {
+            @Override
+            public Stream<PluginIdentifier> apply(PluginType pluginType) {
+                List<? extends Config> configList = config.getConfigList(pluginType.getType());
+                return configList.stream()
+                    .map(pluginConfig -> PluginIdentifier
+                        .of(engine.getEngine(),
+                            pluginType.getType(),
+                            pluginConfig.getString("plugin_name")));
+            }
+        }).collect(Collectors.toList());
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 758ec8c8..b3f7eaff 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -32,7 +32,6 @@ import java.nio.file.Path;
 /**
  * Used to build the {@link  Config} from file.
  *
- * @param <ENVIRONMENT> environment type.
  */
 public class ConfigBuilder {
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
index 62b807d1..0c4fd178 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
@@ -42,9 +42,9 @@ public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionFactory.class);
 
-    public ExecutionContext<ENVIRONMENT> executionContext;
+    public AbstractExecutionContext<ENVIRONMENT> executionContext;
 
-    public ExecutionFactory(ExecutionContext<ENVIRONMENT> executionContext) {
+    public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> executionContext) {
         this.executionContext = executionContext;
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
deleted file mode 100644
index 82b51ef1..00000000
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.base.config;
-
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.apis.base.plugin.Plugin;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.flink.BaseFlinkSource;
-import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.spark.BaseSparkSink;
-import org.apache.seatunnel.spark.BaseSparkSource;
-import org.apache.seatunnel.spark.BaseSparkTransform;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.ServiceConfigurationError;
-import java.util.ServiceLoader;
-import java.util.stream.Collectors;
-
-/**
- * Used to load the plugins.
- *
- * @param <ENVIRONMENT> environment
- */
-public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(PluginFactory.class);
-    private final Config config;
-    private final EngineType engineType;
-    private static final Map<EngineType, Map<PluginType, Class<?>>> PLUGIN_BASE_CLASS_MAP;
-
-    private static final String PLUGIN_NAME_KEY = "plugin_name";
-    private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
-
-    private final List<URL> pluginJarPaths;
-    private final ClassLoader defaultClassLoader;
-
-    static {
-        PLUGIN_BASE_CLASS_MAP = new HashMap<>();
-        Map<PluginType, Class<?>> sparkBaseClassMap = new HashMap<>();
-        sparkBaseClassMap.put(PluginType.SOURCE, BaseSparkSource.class);
-        sparkBaseClassMap.put(PluginType.TRANSFORM, BaseSparkTransform.class);
-        sparkBaseClassMap.put(PluginType.SINK, BaseSparkSink.class);
-        PLUGIN_BASE_CLASS_MAP.put(EngineType.SPARK, sparkBaseClassMap);
-
-        Map<PluginType, Class<?>> flinkBaseClassMap = new HashMap<>();
-        flinkBaseClassMap.put(PluginType.SOURCE, BaseFlinkSource.class);
-        flinkBaseClassMap.put(PluginType.TRANSFORM, BaseFlinkTransform.class);
-        flinkBaseClassMap.put(PluginType.SINK, BaseFlinkSink.class);
-        PLUGIN_BASE_CLASS_MAP.put(EngineType.FLINK, flinkBaseClassMap);
-    }
-
-    public PluginFactory(Config config, EngineType engineType) {
-        this.config = config;
-        this.engineType = engineType;
-        this.pluginJarPaths = searchPluginJar();
-        this.defaultClassLoader = initClassLoaderWithPaths(this.pluginJarPaths);
-    }
-
-    private ClassLoader initClassLoaderWithPaths(List<URL> pluginJarPaths) {
-        return new URLClassLoader(pluginJarPaths.toArray(new URL[0]),
-                Thread.currentThread().getContextClassLoader());
-    }
-
-    @Nonnull
-    private List<URL> searchPluginJar() {
-
-        File pluginDir = Common.connectorJarDir(this.engineType.getEngine()).toFile();
-        if (!pluginDir.exists() || pluginDir.listFiles() == null) {
-            return new ArrayList<>();
-        }
-        Config pluginMapping = ConfigFactory
-                .parseFile(new File(getPluginMappingPath()))
-                .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                .resolveWith(ConfigFactory.systemProperties(),
-                        ConfigResolveOptions.defaults().setAllowUnresolved(true));
-        File[] plugins =
-                Arrays.stream(pluginDir.listFiles()).filter(f -> f.getName().endsWith(".jar")).toArray(File[]::new);
-
-        return Arrays.stream(PluginType.values()).filter(type -> !PluginType.TRANSFORM.equals(type))
-                .flatMap(type -> {
-                    List<URL> pluginList = new ArrayList<>();
-                    List<? extends Config> configList = config.getConfigList(type.getType());
-                    configList.forEach(pluginConfig -> {
-
-                        if (containPluginMappingValue(pluginMapping, type, pluginConfig.getString(PLUGIN_NAME_KEY))) {
-                            try {
-                                for (File plugin : plugins) {
-                                    if (plugin.getName().startsWith(getPluginMappingValue(pluginMapping, type,
-                                            pluginConfig.getString(PLUGIN_NAME_KEY)))) {
-                                        pluginList.add(plugin.toURI().toURL());
-                                        break;
-                                    }
-                                }
-                            } catch (MalformedURLException e) {
-                                LOGGER.warn("can get plugin url", e);
-                            }
-                        } else {
-                            throw new IllegalArgumentException(String.format("can't find connector %s in " +
-                                            "%s. If you add connector to connectors dictionary, please modify this " +
-                                            "file.", getPluginMappingKey(type, pluginConfig.getString(PLUGIN_NAME_KEY)),
-                                    getPluginMappingPath()));
-                        }
-
-                    });
-                    return pluginList.stream();
-                }).collect(Collectors.toList());
-    }
-
-    public List<URL> getPluginJarPaths() {
-        return this.pluginJarPaths;
-    }
-
-    private String getPluginMappingPath() {
-        return Common.connectorDir() + "/" + PLUGIN_MAPPING_FILE;
-    }
-
-    private String getPluginMappingKey(PluginType type, String pluginName) {
-        return this.engineType.getEngine() + "." + type.getType() + "." + pluginName;
-
-    }
-
-    private String getPluginMappingValue(Config pluginMapping, PluginType type, String pluginName) {
-        return pluginMapping.getConfig(this.engineType.getEngine()).getConfig(type.getType()).getString(pluginName);
-    }
-
-    private boolean containPluginMappingValue(Config pluginMapping, PluginType type, String pluginName) {
-        if (pluginMapping.hasPath(this.engineType.getEngine())) {
-            Config engine = pluginMapping.getConfig(this.engineType.getEngine());
-            if (engine.hasPath(type.getType())) {
-                Config plugins = engine.getConfig(type.getType());
-                return plugins.hasPath(pluginName);
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Create the plugins by plugin type.
-     *
-     * @param type plugin type
-     * @param <T>  plugin
-     * @return plugin list.
-     */
-    @SuppressWarnings("unchecked")
-    public <T extends Plugin<ENVIRONMENT>> List<T> createPlugins(PluginType type) {
-        Objects.requireNonNull(type, "PluginType can not be null when create plugins!");
-        List<T> basePluginList = new ArrayList<>();
-        List<? extends Config> configList = config.getConfigList(type.getType());
-        configList.forEach(plugin -> {
-            try {
-                T t = (T) createPluginInstanceIgnoreCase(type, plugin.getString(PLUGIN_NAME_KEY), this.defaultClassLoader);
-                t.setConfig(plugin);
-                basePluginList.add(t);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        });
-
-        return basePluginList;
-    }
-
-    /**
-     * create plugin class instance, ignore case.
-     **/
-    @SuppressWarnings("unchecked")
-    private Plugin<?> createPluginInstanceIgnoreCase(PluginType pluginType, String pluginName,
-                                                     ClassLoader classLoader) throws Exception {
-        Class<Plugin<?>> pluginBaseClass = (Class<Plugin<?>>) getPluginBaseClass(engineType, pluginType);
-
-        if (pluginName.split("\\.").length != 1) {
-            // canonical class name
-            Class<Plugin<?>> pluginClass = (Class<Plugin<?>>) Class.forName(pluginName);
-            if (pluginClass.isAssignableFrom(pluginBaseClass)) {
-                throw new IllegalArgumentException("plugin: " + pluginName + " is not extends from " + pluginBaseClass);
-            }
-            return pluginClass.getDeclaredConstructor().newInstance();
-        }
-        ServiceLoader<Plugin<?>> plugins = ServiceLoader.load(pluginBaseClass, classLoader);
-        for (Iterator<Plugin<?>> it = plugins.iterator(); it.hasNext(); ) {
-            try {
-                Plugin<?> plugin = it.next();
-                if (StringUtils.equalsIgnoreCase(plugin.getPluginName(), pluginName)) {
-                    return plugin;
-                }
-            } catch (ServiceConfigurationError e) {
-                // Iterator.next() may throw ServiceConfigurationError,
-                // but maybe caused by a not used plugin in this job
-                LOGGER.warn("Error when load plugin: [{}]", pluginName, e);
-            }
-        }
-        throw new ClassNotFoundException("Plugin class not found by name :[" + pluginName + "]");
-    }
-
-    private Class<?> getPluginBaseClass(EngineType engineType, PluginType pluginType) {
-        if (!PLUGIN_BASE_CLASS_MAP.containsKey(engineType)) {
-            throw new IllegalStateException("PluginType not support : [" + pluginType + "]");
-        }
-        Map<PluginType, Class<?>> pluginTypeClassMap = PLUGIN_BASE_CLASS_MAP.get(engineType);
-        if (!pluginTypeClassMap.containsKey(pluginType)) {
-            throw new IllegalStateException(pluginType + " is not supported in engine " + engineType);
-        }
-        return pluginTypeClassMap.get(pluginType);
-    }
-
-}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index 345f8414..0479a552 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -24,11 +24,11 @@ import org.apache.seatunnel.apis.base.env.Execution;
 import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.ExecutionContext;
 import org.apache.seatunnel.core.base.config.ExecutionFactory;
 import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkExecutionContext;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -53,7 +53,7 @@ public class FlinkApiTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkComm
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
         Config config = new ConfigBuilder(configFile).getConfig();
-        ExecutionContext<FlinkEnvironment> executionContext = new ExecutionContext<>(config, engine);
+        FlinkExecutionContext executionContext = new FlinkExecutionContext(config, engine);
         List<BaseSource<FlinkEnvironment>> sources = executionContext.getSources();
         List<BaseTransform<FlinkEnvironment>> transforms = executionContext.getTransforms();
         List<BaseSink<FlinkEnvironment>> sinks = executionContext.getSinks();
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
index 7dca74ec..447603aa 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.core.flink.config;
 import org.apache.seatunnel.core.base.config.ConfigChecker;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.config.EnvironmentFactory;
-import org.apache.seatunnel.core.base.config.PluginFactory;
-import org.apache.seatunnel.core.base.config.PluginType;
 import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
@@ -35,10 +33,10 @@ public class FlinkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
             // check environment
             FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
             // check plugins
-            PluginFactory<FlinkEnvironment> pluginFactory = new PluginFactory<>(config, EngineType.FLINK);
-            pluginFactory.createPlugins(PluginType.SOURCE);
-            pluginFactory.createPlugins(PluginType.TRANSFORM);
-            pluginFactory.createPlugins(PluginType.SINK);
+            FlinkExecutionContext flinkExecutionContext = new FlinkExecutionContext(config, EngineType.FLINK);
+            flinkExecutionContext.getSources();
+            flinkExecutionContext.getTransforms();
+            flinkExecutionContext.getSinks();
         } catch (Exception ex) {
             throw new ConfigCheckException("Config check fail", ex);
         }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
new file mode 100644
index 00000000..24cb9d82
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.config;
+
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.flink.FlinkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.flink.FlinkSourcePluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.flink.FlinkTransformPluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnvironment> {
+    private final FlinkSourcePluginDiscovery flinkSourcePluginDiscovery;
+    private final FlinkTransformPluginDiscovery flinkTransformPluginDiscovery;
+    private final FlinkSinkPluginDiscovery flinkSinkPluginDiscovery;
+    private final List<URL> pluginJars;
+
+    public FlinkExecutionContext(Config config, EngineType engine) {
+        super(config, engine);
+        this.flinkSourcePluginDiscovery = new FlinkSourcePluginDiscovery();
+        this.flinkTransformPluginDiscovery = new FlinkTransformPluginDiscovery();
+        this.flinkSinkPluginDiscovery = new FlinkSinkPluginDiscovery();
+        List<URL> pluginJars = new ArrayList<>();
+        // since we didn't split the transform plugin jars, we just need to register the source/sink plugin jars
+        pluginJars.addAll(flinkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
+        pluginJars.addAll(flinkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
+        this.pluginJars = pluginJars;
+        this.getEnvironment().registerPlugin(pluginJars);
+    }
+
+    @Override
+    public List<BaseSource<FlinkEnvironment>> getSources() {
+        final String pluginType = PluginType.SOURCE.getType();
+        final String engineType = EngineType.FLINK.getEngine();
+        final List<? extends Config> configList = getRootConfig().getConfigList(pluginType);
+        return configList.stream()
+            .map(pluginConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
+                BaseSource<FlinkEnvironment> pluginInstance = flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(pluginConfig);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<BaseTransform<FlinkEnvironment>> getTransforms() {
+        final String pluginType = PluginType.TRANSFORM.getType();
+        final String engineType = EngineType.FLINK.getEngine();
+        final List<? extends Config> configList = getRootConfig().getConfigList(pluginType);
+        return configList.stream()
+            .map(pluginConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
+                BaseTransform<FlinkEnvironment> pluginInstance = flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(pluginConfig);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<BaseSink<FlinkEnvironment>> getSinks() {
+        final String pluginType = PluginType.SINK.getType();
+        final String engineType = EngineType.FLINK.getEngine();
+        final List<? extends Config> configList = getRootConfig().getConfigList(pluginType);
+        return configList.stream()
+            .map(pluginConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
+                BaseSink<FlinkEnvironment> pluginInstance = flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(pluginConfig);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<URL> getPluginJars() {
+        return pluginJars;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 41b53080..4675cb2e 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -19,16 +19,15 @@ package org.apache.seatunnel.core.spark;
 
 import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
 
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.base.Starter;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginFactory;
 import org.apache.seatunnel.core.base.utils.CompressionUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.config.SparkExecutionContext;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -216,13 +215,13 @@ public class SparkStarter implements Starter {
      * return connector's jars, which located in 'connectors/spark/*'.
      */
     private List<Path> getConnectorJarDependencies() {
-        Path pluginRootDir = Common.connectorJarDir("SPARK");
+        Path pluginRootDir = Common.connectorJarDir("spark");
         if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
             return Collections.emptyList();
         }
         Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
-        PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config, EngineType.SPARK);
-        return pluginFactory.getPluginJarPaths().stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
+        SparkExecutionContext sparkExecutionContext = new SparkExecutionContext(config, EngineType.SPARK);
+        return sparkExecutionContext.getPluginJars().stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
     }
 
     /**
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index 9a051416..e9c54b44 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -24,11 +24,11 @@ import org.apache.seatunnel.apis.base.env.Execution;
 import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.ExecutionContext;
 import org.apache.seatunnel.core.base.config.ExecutionFactory;
 import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.config.SparkExecutionContext;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -50,7 +50,7 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
         Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
 
         Config config = new ConfigBuilder(confFile).getConfig();
-        ExecutionContext<SparkEnvironment> executionContext = new ExecutionContext<>(config, engine);
+        SparkExecutionContext executionContext = new SparkExecutionContext(config, engine);
 
         List<BaseSource<SparkEnvironment>> sources = executionContext.getSources();
         List<BaseTransform<SparkEnvironment>> transforms = executionContext.getTransforms();
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
index 26de5794..776e4d66 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.core.spark.config;
 import org.apache.seatunnel.core.base.config.ConfigChecker;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.config.EnvironmentFactory;
-import org.apache.seatunnel.core.base.config.PluginFactory;
-import org.apache.seatunnel.core.base.config.PluginType;
 import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
@@ -33,12 +31,12 @@ public class SparkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
     public void checkConfig(Config config) throws ConfigCheckException {
         try {
             // check environment
-            FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+            FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.SPARK).getEnvironment();
             // check plugins
-            PluginFactory<FlinkEnvironment> pluginFactory = new PluginFactory<>(config, EngineType.FLINK);
-            pluginFactory.createPlugins(PluginType.SOURCE);
-            pluginFactory.createPlugins(PluginType.TRANSFORM);
-            pluginFactory.createPlugins(PluginType.SINK);
+            SparkExecutionContext sparkExecutionContext = new SparkExecutionContext(config, EngineType.SPARK);
+            sparkExecutionContext.getSources();
+            sparkExecutionContext.getTransforms();
+            sparkExecutionContext.getSinks();
         } catch (Exception ex) {
             throw new ConfigCheckException("Config check fail", ex);
         }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
new file mode 100644
index 00000000..13d1c0f3
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkTransformPluginDiscovery;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkExecutionContext extends AbstractExecutionContext<SparkEnvironment> {
+    private final SparkSourcePluginDiscovery sparkSourcePluginDiscovery;
+    private final SparkTransformPluginDiscovery sparkTransformPluginDiscovery;
+    private final SparkSinkPluginDiscovery sparkSinkPluginDiscovery;
+    private final List<URL> pluginJars;
+
+    public SparkExecutionContext(Config config, EngineType engine) {
+        super(config, engine);
+        this.sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
+        this.sparkTransformPluginDiscovery = new SparkTransformPluginDiscovery();
+        this.sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
+        List<URL> pluginJars = new ArrayList<>();
+        pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
+        pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
+        this.pluginJars = pluginJars;
+        this.getEnvironment().registerPlugin(pluginJars);
+    }
+
+    @Override
+    public List<BaseSource<SparkEnvironment>> getSources() {
+        final String pluginType = PluginType.SOURCE.getType();
+        final String engineType = EngineType.SPARK.getEngine();
+        final List<? extends Config> configList = getRootConfig().getConfigList(pluginType);
+        return configList.stream()
+            .map(pluginConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
+                BaseSource<SparkEnvironment> pluginInstance = sparkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(pluginConfig);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<BaseTransform<SparkEnvironment>> getTransforms() {
+        final String pluginType = PluginType.TRANSFORM.getType();
+        final String engineType = EngineType.SPARK.getEngine();
+        final List<? extends Config> configList = getRootConfig().getConfigList(pluginType);
+        return configList.stream()
+            .map(pluginConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
+                BaseTransform<SparkEnvironment> pluginInstance = sparkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(pluginConfig);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<BaseSink<SparkEnvironment>> getSinks() {
+        final String pluginType = PluginType.SINK.getType();
+        final String engineType = EngineType.SPARK.getEngine();
+        final List<? extends Config> configList = getRootConfig().getConfigList(pluginType);
+        return configList.stream()
+            .map(pluginConfig -> {
+                PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
+                BaseSink<SparkEnvironment> pluginInstance = sparkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                pluginInstance.setConfig(pluginConfig);
+                return pluginInstance;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<URL> getPluginJars() {
+        return pluginJars;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-base/pom.xml b/seatunnel-plugin-discovery/pom.xml
similarity index 79%
copy from seatunnel-core/seatunnel-core-base/pom.xml
copy to seatunnel-plugin-discovery/pom.xml
index 1248fb91..378e0670 100644
--- a/seatunnel-core/seatunnel-core-base/pom.xml
+++ b/seatunnel-plugin-discovery/pom.xml
@@ -1,64 +1,50 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-core</artifactId>
+        <artifactId>seatunnel</artifactId>
         <version>${revision}</version>
-        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-core-base</artifactId>
+    <artifactId>seatunnel-plugin-discovery</artifactId>
 
     <dependencies>
-
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api-flink</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api-spark</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
-            <groupId>com.beust</groupId>
-            <artifactId>jcommander</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
 
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
new file mode 100644
index 00000000..8c90dbab
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery;
+
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.config.Common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPluginDiscovery.class);
+    private final Path pluginDir;
+
+    protected final ConcurrentHashMap<PluginIdentifier, Optional<T>> pluginInstanceMap =
+        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+    protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
+        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+
+    public AbstractPluginDiscovery(String pluginSubDir) {
+        this.pluginDir = Common.connectorJarDir(pluginSubDir);
+        LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
+    }
+
+    @Override
+    public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
+        return pluginIdentifiers.stream()
+            .map(this::getPluginJarPath)
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
+        return pluginIdentifiers.stream()
+            .map(this::getPluginInstance)
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public T getPluginInstance(PluginIdentifier pluginIdentifier) {
+        Optional<T> pluginInstance = pluginInstanceMap
+            .computeIfAbsent(pluginIdentifier, this::createPluginInstance);
+        if (!pluginInstance.isPresent()) {
+            throw new IllegalArgumentException("Can't find plugin: " + pluginIdentifier);
+        }
+        return pluginInstance.get();
+    }
+
+    /**
+     * Get the plugin instance.
+     *
+     * @param pluginIdentifier plugin identifier.
+     * @return plugin instance.
+     */
+    protected Optional<URL> getPluginJarPath(PluginIdentifier pluginIdentifier) {
+        return pluginJarPath.computeIfAbsent(pluginIdentifier, this::findPluginJarPath);
+    }
+
+    /**
+     * Get spark plugin interface.
+     *
+     * @return plugin base class.
+     */
+    protected abstract Class<T> getPluginBaseClass();
+
+    /**
+     * Find the plugin jar path;
+     *
+     * @param pluginIdentifier plugin identifier.
+     * @return plugin jar path.
+     */
+    private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier) {
+        if (PLUGIN_JAR_MAPPING.isEmpty()) {
+            return Optional.empty();
+        }
+        final String engineType = pluginIdentifier.getEngineType().toLowerCase();
+        final String pluginType = pluginIdentifier.getPluginType().toLowerCase();
+        final String pluginName = pluginIdentifier.getPluginName().toLowerCase();
+        if (!PLUGIN_JAR_MAPPING.hasPath(engineType)) {
+            return Optional.empty();
+        }
+        Config engineConfig = PLUGIN_JAR_MAPPING.getConfig(engineType);
+        if (!engineConfig.hasPath(pluginType)) {
+            return Optional.empty();
+        }
+        Config typeConfig = engineConfig.getConfig(pluginType);
+        Optional<Map.Entry<String, ConfigValue>> optional = typeConfig.entrySet().stream()
+            .filter(entry -> StringUtils.equalsIgnoreCase(entry.getKey(), pluginName))
+            .findFirst();
+        if (!optional.isPresent()) {
+            return Optional.empty();
+        }
+        String pluginJarPrefix = optional.get().getValue().unwrapped().toString();
+        File[] targetPluginFiles = pluginDir.toFile().listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                return pathname.getName().endsWith(".jar") && StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
+            }
+        });
+        if (ArrayUtils.isEmpty(targetPluginFiles)) {
+            return Optional.empty();
+        }
+        try {
+            URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
+            LOGGER.info("Discovery plugin jar: {} at: {}", pluginIdentifier.getPluginName(), pluginJarPath);
+            return Optional.of(pluginJarPath);
+        } catch (MalformedURLException e) {
+            LOGGER.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
+            return Optional.empty();
+        }
+    }
+
+    private Optional<T> createPluginInstance(PluginIdentifier pluginIdentifier) {
+        Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
+        ClassLoader classLoader;
+        // if the plugin jar not exist in plugin dir, will load from classpath.
+        if (pluginJarPath.isPresent()) {
+            LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, pluginJarPath.get());
+            classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+        } else {
+            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+            classLoader = Thread.currentThread().getContextClassLoader();
+        }
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
+        for (T t : serviceLoader) {
+            // todo: add plugin identifier interface to support new api interface.
+            Plugin<?> pluginInstance = (Plugin<?>) t;
+            if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+                return Optional.of((T) pluginInstance);
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
new file mode 100644
index 00000000..cdc85860
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery;
+
+import org.apache.seatunnel.common.config.Common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Plugins discovery interface, used to find plugin. Each plugin type should have its own implementation.
+ *
+ * @param <T> plugin type
+ */
+public interface PluginDiscovery<T> {
+
+    String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+    /**
+     * The plugin mapping config.
+     * e,g.flink.source.DruidSource=seatunnel-connector-flink-druid
+     */
+    Config PLUGIN_JAR_MAPPING =
+        ConfigFactory
+            // todo: rename to plugin dir
+            .parseFile(Common.connectorDir().resolve(PLUGIN_MAPPING_FILE).toFile())
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+    /**
+     * Get all plugin jar paths.
+     *
+     * @return plugin jars.
+     */
+    List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers);
+
+    /**
+     * Get plugin instance by plugin identifier.
+     *
+     * @param pluginIdentifier plugin identifier.
+     * @return plugin instance. If not found, throw IllegalArgumentException.
+     */
+    T getPluginInstance(PluginIdentifier pluginIdentifier);
+
+    /**
+     * Get all plugin instances.
+     *
+     * @return plugin instances.
+     */
+    List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers);
+
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
new file mode 100644
index 00000000..97d6e9f8
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery;
+
+import java.util.Objects;
+
+/**
+ * Used to identify a plugin.
+ */
+public class PluginIdentifier {
+    private final String engineType;
+    private final String pluginType;
+    private final String pluginName;
+
+    private PluginIdentifier(String engineType, String pluginType, String pluginName) {
+        this.engineType = engineType;
+        this.pluginType = pluginType;
+        this.pluginName = pluginName;
+    }
+
+    public static PluginIdentifier of(String engineType, String pluginType, String pluginName) {
+        return new PluginIdentifier(engineType, pluginType, pluginName);
+    }
+
+    public String getEngineType() {
+        return engineType;
+    }
+
+    public String getPluginType() {
+        return pluginType;
+    }
+
+    public String getPluginName() {
+        return pluginName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        PluginIdentifier that = (PluginIdentifier) o;
+
+        if (!Objects.equals(engineType, that.engineType)) {
+            return false;
+        }
+        if (!Objects.equals(pluginType, that.pluginType)) {
+            return false;
+        }
+        return Objects.equals(pluginName, that.pluginName);
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:magicnumber")
+    public int hashCode() {
+        int result = engineType != null ? engineType.hashCode() : 0;
+        result = 31 * result + (pluginType != null ? pluginType.hashCode() : 0);
+        result = 31 * result + (pluginName != null ? pluginName.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "PluginIdentifier{" +
+            "engineType='" + engineType + '\'' +
+            ", pluginType='" + pluginType + '\'' +
+            ", pluginName='" + pluginName + '\'' +
+            '}';
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
new file mode 100644
index 00000000..a2185a4a
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSinkPluginDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.flink;
+
+import org.apache.seatunnel.flink.BaseFlinkSink;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class FlinkSinkPluginDiscovery extends AbstractPluginDiscovery<BaseFlinkSink> {
+
+    public FlinkSinkPluginDiscovery() {
+        super("flink");
+    }
+
+    @Override
+    protected Class<BaseFlinkSink> getPluginBaseClass() {
+        return BaseFlinkSink.class;
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
new file mode 100644
index 00000000..24ff89e0
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkSourcePluginDiscovery.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.flink;
+
+import org.apache.seatunnel.flink.BaseFlinkSource;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+
+public class FlinkSourcePluginDiscovery extends AbstractPluginDiscovery<BaseFlinkSource> {
+    public FlinkSourcePluginDiscovery() {
+        super("flink");
+    }
+
+    @Override
+    protected Class<BaseFlinkSource> getPluginBaseClass() {
+        return BaseFlinkSource.class;
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
new file mode 100644
index 00000000..cda5ab6b
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.flink;
+
+import org.apache.seatunnel.flink.BaseFlinkTransform;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FlinkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseFlinkTransform> {
+
+    public FlinkTransformPluginDiscovery() {
+        super("flink");
+    }
+
+    public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
+        return new ArrayList<>();
+    }
+
+    @Override
+    protected Class<BaseFlinkTransform> getPluginBaseClass() {
+        return BaseFlinkTransform.class;
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSinkPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSinkPluginDiscovery.java
new file mode 100644
index 00000000..8a30c68e
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSinkPluginDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.spark;
+
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.spark.BaseSparkSink;
+
+public class SparkSinkPluginDiscovery extends AbstractPluginDiscovery<BaseSparkSink> {
+
+    public SparkSinkPluginDiscovery() {
+        super("spark");
+    }
+
+    @Override
+    protected Class<BaseSparkSink> getPluginBaseClass() {
+        return BaseSparkSink.class;
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSourcePluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSourcePluginDiscovery.java
new file mode 100644
index 00000000..ee961bf9
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkSourcePluginDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.spark;
+
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.spark.BaseSparkSource;
+
+public class SparkSourcePluginDiscovery extends AbstractPluginDiscovery<BaseSparkSource> {
+
+    public SparkSourcePluginDiscovery() {
+        super("spark");
+    }
+
+    @Override
+    protected Class<BaseSparkSource> getPluginBaseClass() {
+        return BaseSparkSource.class;
+    }
+}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
new file mode 100644
index 00000000..849fabbd
--- /dev/null
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.spark;
+
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.spark.BaseSparkTransform;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Transform plugin will load from the classpath.
+ */
+public class SparkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseSparkTransform> {
+
+    public SparkTransformPluginDiscovery() {
+        super("spark");
+    }
+
+    public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected Class<BaseSparkTransform> getPluginBaseClass() {
+        return BaseSparkTransform.class;
+    }
+}