You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/24 11:51:57 UTC

[dolphinscheduler] branch dev updated: [Improvement-#11613] Add spi priority factory (#11614)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b96d69701a [Improvement-#11613] Add spi priority factory (#11614)
b96d69701a is described below

commit b96d69701a08f25cf43313df80a96d23c35ee36e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Aug 24 19:51:47 2022 +0800

    [Improvement-#11613] Add spi priority factory (#11614)
    
    * Add spi priority factory
    
    * Add doc
    
    * Add override log
    
    * Use lombok
    
    * Add comment
---
 docs/docs/en/contribute/backend/spi/alert.md       |  4 +-
 docs/docs/en/contribute/backend/spi/datasource.md  |  2 +
 docs/docs/en/contribute/backend/spi/task.md        |  2 +
 docs/docs/zh/contribute/backend/spi/alert.md       |  2 +
 docs/docs/zh/contribute/backend/spi/datasource.md  |  2 +
 docs/docs/zh/contribute/backend/spi/task.md        |  2 +
 .../alert/api/AlertChannelFactory.java             |  9 ++-
 .../dolphinscheduler/alert/AlertPluginManager.java | 34 ++++-----
 .../api/plugin/DataSourcePluginManager.java        | 16 ++--
 .../server/master/MasterServer.java                |  1 +
 .../master/runner/task/BaseTaskProcessor.java      |  6 ++
 .../server/master/runner/task/ITaskProcessor.java  |  3 +-
 .../master/runner/task/TaskProcessorFactory.java   | 27 +++----
 .../service/task/TaskPluginManager.java            | 32 ++++----
 .../spi/datasource/DataSourceChannelFactory.java   | 10 ++-
 .../PrioritySPI.java}                              | 20 +++--
 .../spi/plugin/PrioritySPIFactory.java             | 60 +++++++++++++++
 .../dolphinscheduler/spi/plugin/SPIIdentify.java   | 19 ++++-
 .../spi/plugin/PrioritySPIFactoryTest.java         | 88 ++++++++++++++++++++++
 .../plugin/task/api/TaskChannelFactory.java        |  9 ++-
 20 files changed, 271 insertions(+), 77 deletions(-)

diff --git a/docs/docs/en/contribute/backend/spi/alert.md b/docs/docs/en/contribute/backend/spi/alert.md
index e2629a87a8..9b6c45e547 100644
--- a/docs/docs/en/contribute/backend/spi/alert.md
+++ b/docs/docs/en/contribute/backend/spi/alert.md
@@ -6,7 +6,9 @@ DolphinScheduler is undergoing a microkernel + plug-in architecture change. All
 
 For alarm-related codes, please refer to the `dolphinscheduler-alert-api` module. This module defines the extension interface of the alarm plug-in and some basic codes. When we need to realize the plug-inization of related functions, it is recommended to read the code of this block first. Of course, it is recommended that you read the document. This will reduce a lot of time, but the document There is a certain degree of lag. When the document is missing, it is recommended to take the so [...]
 
-We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple.
+We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple. 
+
+In additional, the `AlertChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
 
 By the way, we have adopted an excellent front-end component form-create, which supports the generation of front-end UI components based on JSON. If plug-in development involves the front-end, we will use JSON to generate related front-end UI components, org.apache.dolphinscheduler. The parameters of the plug-in are encapsulated in spi.params, which will convert all the relevant parameters into the corresponding JSON, which means that you can complete the drawing of the front-end compone [...]
 
diff --git a/docs/docs/en/contribute/backend/spi/datasource.md b/docs/docs/en/contribute/backend/spi/datasource.md
index 5772b4357c..9738e07330 100644
--- a/docs/docs/en/contribute/backend/spi/datasource.md
+++ b/docs/docs/en/contribute/backend/spi/datasource.md
@@ -18,6 +18,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
 
 We provide APIs for external access of all data sources in the dolphin scheduler data source API module
 
+In additional, the `DataSourceChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
+
 #### **Future plan**
 
 Support data sources such as kafka, http, files, sparkSQL, FlinkSQL, etc.
\ No newline at end of file
diff --git a/docs/docs/en/contribute/backend/spi/task.md b/docs/docs/en/contribute/backend/spi/task.md
index 70b01d48ff..f909d42fa8 100644
--- a/docs/docs/en/contribute/backend/spi/task.md
+++ b/docs/docs/en/contribute/backend/spi/task.md
@@ -8,6 +8,8 @@ The plug-in can implement the above interface. It mainly includes creating tasks
 
 We provide APIs for external access to all tasks in the dolphinscheduler-task-api module, while the dolphinscheduler-spi module is the spi general code library, which defines all the plug-in modules, such as the alarm module, the registry module, etc., you can read and view in detail .
 
+In additional, the `TaskChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
+
 *NOTICE*
 
 Since the task plug-in involves the front-end page, the front-end SPI has not yet been implemented, so you need to implement the front-end page corresponding to the plug-in separately.
diff --git a/docs/docs/zh/contribute/backend/spi/alert.md b/docs/docs/zh/contribute/backend/spi/alert.md
index 709802e782..21ea651967 100644
--- a/docs/docs/zh/contribute/backend/spi/alert.md
+++ b/docs/docs/zh/contribute/backend/spi/alert.md
@@ -8,6 +8,8 @@ DolphinScheduler 正在处于微内核 + 插件化的架构更改之中,所有
 
 我们采用了原生的 JAVA-SPI,当你需要扩展的时候,事实上你只需要关注扩展`org.apache.dolphinscheduler.alert.api.AlertChannelFactory`接口即可,底层相关逻辑如插件加载等内核已经实现,这让我们的开发更加专注且简单。
 
+另外,`AlertChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。
+
 顺便提一句,我们采用了一款优秀的前端组件 form-create,它支持基于 json 生成前端 ui 组件,如果插件开发牵扯到前端,我们会通过 json 来生成相关前端 UI 组件,org.apache.dolphinscheduler.spi.params 里面对插件的参数做了封装,它会将相关参数全部全部转化为对应的 json,这意味这你完全可以通过 Java 代码的方式完成前端组件的绘制(这里主要是表单,我们只关心前后端交互的数据)。
 
 本文主要着重讲解 Alert 告警相关设计以及开发。
diff --git a/docs/docs/zh/contribute/backend/spi/datasource.md b/docs/docs/zh/contribute/backend/spi/datasource.md
index 1868c86d9e..a2fc4b59de 100644
--- a/docs/docs/zh/contribute/backend/spi/datasource.md
+++ b/docs/docs/zh/contribute/backend/spi/datasource.md
@@ -17,6 +17,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
 
 我们在 dolphinscheduler-datasource-api 模块提供了所有数据源对外访问的 API
 
+另外,DataSourceChannelFactory 继承自PrioritySPI,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写getIdentify 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。
+
 #### **未来计划**
 
 支持kafka、http、文件、sparkSQL、FlinkSQL等数据源
diff --git a/docs/docs/zh/contribute/backend/spi/task.md b/docs/docs/zh/contribute/backend/spi/task.md
index b2ee5242b5..fb0fe88fbe 100644
--- a/docs/docs/zh/contribute/backend/spi/task.md
+++ b/docs/docs/zh/contribute/backend/spi/task.md
@@ -8,6 +8,8 @@ org.apache.dolphinscheduler.spi.task.TaskChannel
 
 我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。
 
+另外,`TaskChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。
+
 *NOTICE*
 
 由于任务插件涉及到前端页面,目前前端的SPI还没有实现,因此你需要单独实现插件对应的前端页面。
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
index 2fa328e213..0f9878485c 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
@@ -20,15 +20,18 @@
 package org.apache.dolphinscheduler.alert.api;
 
 import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
 
 import java.util.List;
 
 /**
  * alert channel factory
  */
-public interface AlertChannelFactory {
+public interface AlertChannelFactory extends PrioritySPI {
     /**
      * Returns the name of the alert channel
+     *
      * @return the name of the alert channel
      */
     String name();
@@ -44,4 +47,8 @@ public interface AlertChannelFactory {
      * Returns the configurable parameters that this plugin needs to display on the web ui
      */
     List<PluginParams> params();
+
+    default SPIIdentify getIdentify() {
+        return SPIIdentify.builder().name(name()).build();
+    }
 }
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
index cc84769304..782bf0aa85 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.alert;
 
-import static java.lang.String.format;
-
 import org.apache.dolphinscheduler.alert.api.AlertChannel;
 import org.apache.dolphinscheduler.alert.api.AlertChannelFactory;
 import org.apache.dolphinscheduler.alert.api.AlertConstants;
@@ -31,21 +29,20 @@ import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
 import org.apache.dolphinscheduler.spi.params.base.PluginParams;
 import org.apache.dolphinscheduler.spi.params.base.Validate;
 import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.ServiceLoader;
-import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.context.event.ApplicationReadyEvent;
-import org.springframework.context.event.EventListener;
-import org.springframework.stereotype.Component;
+import static java.lang.String.format;
 
 @Component
 public final class AlertPluginManager {
@@ -74,20 +71,17 @@ public final class AlertPluginManager {
 
     @EventListener
     public void installPlugin(ApplicationReadyEvent readyEvent) {
-        final Set<String> names = new HashSet<>();
-
-        ServiceLoader.load(AlertChannelFactory.class).forEach(factory -> {
-            final String name = factory.name();
 
-            logger.info("Registering alert plugin: {}", name);
+        PrioritySPIFactory<AlertChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class);
+        for (Map.Entry<String, AlertChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
+            String name = entry.getKey();
+            AlertChannelFactory factory = entry.getValue();
 
-            if (!names.add(name)) {
-                throw new IllegalStateException(format("Duplicate alert plugins named '%s'", name));
-            }
+            logger.info("Registering alert plugin: {} - {}", name, factory.getClass());
 
             final AlertChannel alertChannel = factory.create();
 
-            logger.info("Registered alert plugin: {}", name);
+            logger.info("Registered alert plugin: {} - {}", name, factory.getClass());
 
             final List<PluginParams> params = new ArrayList<>(factory.params());
             params.add(0, warningTypeParams);
@@ -98,7 +92,7 @@ public final class AlertPluginManager {
             final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
 
             channelKeyedById.put(id, alertChannel);
-        });
+        }
     }
 
     public Optional<AlertChannel> getAlertChannel(int id) {
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
index fd31579a7c..0610ed6d7b 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
@@ -17,18 +17,18 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
 
-import static java.lang.String.format;
-
 import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
 import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.lang.String.format;
 
 public class DataSourcePluginManager {
     private static final Logger logger = LoggerFactory.getLogger(DataSourcePluginManager.class);
@@ -41,8 +41,10 @@ public class DataSourcePluginManager {
 
     public void installPlugin() {
 
-        ServiceLoader.load(DataSourceChannelFactory.class).forEach(factory -> {
-            final String name = factory.getName();
+        PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(DataSourceChannelFactory.class);
+        for (Map.Entry<String, DataSourceChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
+            final DataSourceChannelFactory factory = entry.getValue();
+            final String name = entry.getKey();
 
             logger.info("Registering datasource plugin: {}", name);
 
@@ -53,7 +55,7 @@ public class DataSourcePluginManager {
             loadDatasourceClient(factory);
 
             logger.info("Registered datasource plugin: {}", name);
-        });
+        }
     }
 
     private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6419a97365..06d2c3ceb0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.quartz.SchedulerException;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 7b5f7c66f7..8651dc018a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -261,6 +262,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         throw new UnsupportedOperationException("This abstract class doesn's has type");
     }
 
+    @Override
+    public SPIIdentify getIdentify() {
+        return SPIIdentify.builder().name(getType()).build();
+    }
+
     @Override
     public TaskInstance taskInstance() {
         return this.taskInstance;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index d7d241b7f6..de0a0a7c23 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -19,11 +19,12 @@ package org.apache.dolphinscheduler.server.master.runner.task;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
 
 /**
  * interface of task processor in master
  */
-public interface ITaskProcessor {
+public interface ITaskProcessor extends PrioritySPI {
 
     void init(TaskInstance taskInstance, ProcessInstance processInstance);
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 9595c46dd8..f585c94bbf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -17,24 +17,18 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_STREAM;
-
+import lombok.experimental.UtilityClass;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
-import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.experimental.UtilityClass;
-
-import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
 
 /**
  * the factory to create task processor
@@ -44,16 +38,19 @@ public final class TaskProcessorFactory {
 
     private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
 
-    public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
+    private static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
 
     private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
 
     static {
-        for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
+        PrioritySPIFactory<ITaskProcessor> prioritySPIFactory = new PrioritySPIFactory<>(ITaskProcessor.class);
+        for (Map.Entry<String, ITaskProcessor> entry : prioritySPIFactory.getSPIMap().entrySet()) {
             try {
-                PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
+                logger.info("Registering task processor: {} - {}", entry.getKey(), entry.getValue().getClass());
+                PROCESS_MAP.put(entry.getKey(), (Constructor<ITaskProcessor>) entry.getValue().getClass().getConstructor());
+                logger.info("Registered task processor: {} - {}", entry.getKey(), entry.getValue().getClass());
             } catch (NoSuchMethodException e) {
-                throw new IllegalArgumentException("The task processor should has a no args constructor", e);
+                throw new IllegalArgumentException(String.format("The task processor: %s should has a no args constructor", entry.getKey()));
             }
         }
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
index 1289b5718a..e3776e09ba 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
@@ -17,25 +17,21 @@
 
 package org.apache.dolphinscheduler.service.task;
 
-import static java.lang.String.format;
-
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.ServiceLoader;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
 @Component
 public class TaskPluginManager {
     private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class);
@@ -53,19 +49,19 @@ public class TaskPluginManager {
             logger.warn("The task plugin has already been loaded");
             return;
         }
-        ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
-            final String name = factory.getName();
+        PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
+        for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
+            String factoryName = entry.getKey();
+            TaskChannelFactory factory = entry.getValue();
 
-            logger.info("Registering task plugin: {}", name);
+            logger.info("Registering task plugin: {} - {}", factoryName, factory.getClass());
 
-            if (taskChannelFactoryMap.containsKey(name)) {
-                throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
-            }
-            taskChannelFactoryMap.put(name, factory);
-            taskChannelMap.put(name, factory.create());
+            taskChannelFactoryMap.put(factoryName, factory);
+            taskChannelMap.put(factoryName, factory.create());
+
+            logger.info("Registered task plugin: {} - {}", factoryName, factory.getClass());
+        }
 
-            logger.info("Registered task plugin: {}", name);
-        });
     }
 
     public Map<String, TaskChannel> getTaskChannelMap() {
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
index c947c3a647..eceb39fda9 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
@@ -17,7 +17,10 @@
 
 package org.apache.dolphinscheduler.spi.datasource;
 
-public interface DataSourceChannelFactory {
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
+
+public interface DataSourceChannelFactory extends PrioritySPI {
     /**
      * get datasource client
      */
@@ -27,4 +30,9 @@ public interface DataSourceChannelFactory {
      * get registry component name
      */
     String getName();
+
+    @Override
+    default SPIIdentify getIdentify() {
+        return SPIIdentify.builder().name(getName()).build();
+    }
 }
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
similarity index 63%
copy from dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
copy to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
index c947c3a647..094b39ce64 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
@@ -15,16 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.spi.datasource;
+package org.apache.dolphinscheduler.spi.plugin;
 
-public interface DataSourceChannelFactory {
-    /**
-     * get datasource client
-     */
-    DataSourceChannel create();
+public interface PrioritySPI extends Comparable<Integer> {
 
     /**
-     * get registry component name
+     * The SPI identify, if the two plugin has the same name, will load the high priority.
+     * If the priority and name is all same, will throw <code>IllegalArgumentException</code>
+     * @return
      */
-    String getName();
+    SPIIdentify getIdentify();
+
+    @Override
+    default int compareTo(Integer o) {
+        return Integer.compare(getIdentify().getPriority(), o);
+    }
+
 }
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java
new file mode 100644
index 0000000000..8b1921b700
--- /dev/null
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.plugin;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+@Slf4j
+public class PrioritySPIFactory<T extends PrioritySPI> {
+
+    private final Map<String, T> map = new HashMap<>();
+
+    public PrioritySPIFactory(Class<T> spiClass) {
+        for (T t : ServiceLoader.load(spiClass)) {
+            if (map.containsKey(t.getIdentify().getName())) {
+                resolveConflict(t);
+            } else {
+                map.put(t.getIdentify().getName(), t);
+            }
+        }
+    }
+
+    public Map<String, T> getSPIMap() {
+        return Collections.unmodifiableMap(map);
+    }
+
+    private void resolveConflict(T newSPI) {
+        SPIIdentify identify = newSPI.getIdentify();
+        T oldSPI = map.get(identify.getName());
+
+        if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) == 0) {
+            throw new IllegalArgumentException(String.format("These two spi plugins has conflict identify name with the same priority: %s, %s",
+                    oldSPI.getIdentify(), newSPI.getIdentify()));
+        } else if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) > 0) {
+            log.info("The {} plugin has high priority, will override {}", newSPI.getIdentify(), oldSPI);
+            map.put(identify.getName(), newSPI);
+        } else {
+            log.info("The low plugin {} will be skipped", newSPI);
+        }
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
similarity index 70%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
copy to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
index 643c4cdb70..e55de1c466 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
@@ -15,11 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.spi.plugin;
 
-import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
 
-public interface TaskChannelFactory extends UiChannelFactory {
+@Data
+@Builder
+@AllArgsConstructor
+public class SPIIdentify {
+
+    private static final int DEFAULT_PRIORITY = 0;
+
+    private String name;
+
+    @Builder.Default
+    private int priority = DEFAULT_PRIORITY;
 
-    TaskChannel create();
 }
diff --git a/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java b/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java
new file mode 100644
index 0000000000..4ed3519a6d
--- /dev/null
+++ b/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.plugin;
+
+import com.google.auto.service.AutoService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class PrioritySPIFactoryTest {
+
+    @Test
+    public void loadHighPriority() {
+        PrioritySPIFactory<LoadHighPriorityConflictTestSPI> factory = new PrioritySPIFactory<>(LoadHighPriorityConflictTestSPI.class);
+        Map<String, LoadHighPriorityConflictTestSPI> spiMap = factory.getSPIMap();
+        Assert.assertEquals(1, spiMap.get("A").getIdentify().getPriority());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwExceptionWhenPriorityIsSame() {
+        PrioritySPIFactory<ThrowExceptionConflictTestSPI> factory = new PrioritySPIFactory<>(ThrowExceptionConflictTestSPI.class);
+        Map<String, ThrowExceptionConflictTestSPI> spiMap = factory.getSPIMap();
+        Assert.assertEquals(0, spiMap.get("B").getIdentify().getPriority());
+    }
+
+
+    public interface LoadHighPriorityConflictTestSPI extends PrioritySPI {
+
+    }
+
+    @AutoService(LoadHighPriorityConflictTestSPI.class)
+    public static class SPIA implements LoadHighPriorityConflictTestSPI {
+
+        @Override
+        public SPIIdentify getIdentify() {
+            return SPIIdentify.builder().name("A").priority(0).build();
+        }
+    }
+
+    @AutoService(LoadHighPriorityConflictTestSPI.class)
+    public static class SPIAA implements LoadHighPriorityConflictTestSPI {
+
+        @Override
+        public SPIIdentify getIdentify() {
+            return SPIIdentify.builder().name("A").priority(1).build();
+        }
+    }
+
+    public interface ThrowExceptionConflictTestSPI extends PrioritySPI {
+
+    }
+
+    @AutoService(ThrowExceptionConflictTestSPI.class)
+    public static class SPIB implements ThrowExceptionConflictTestSPI {
+
+        @Override
+        public SPIIdentify getIdentify() {
+            return SPIIdentify.builder().name("B").priority(0).build();
+        }
+    }
+
+    @AutoService(ThrowExceptionConflictTestSPI.class)
+    public static class SPIBB implements ThrowExceptionConflictTestSPI {
+
+        @Override
+        public SPIIdentify getIdentify() {
+            return SPIIdentify.builder().name("B").priority(0).build();
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
index 643c4cdb70..51bf1957e4 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
@@ -18,8 +18,15 @@
 package org.apache.dolphinscheduler.plugin.task.api;
 
 import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
 
-public interface TaskChannelFactory extends UiChannelFactory {
+public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI {
 
     TaskChannel create();
+
+    default SPIIdentify getIdentify() {
+        return SPIIdentify.builder().name(getName()).build();
+    }
+
 }