You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/24 11:51:57 UTC
[dolphinscheduler] branch dev updated: [Improvement-#11613] Add spi priority factory (#11614)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b96d69701a [Improvement-#11613] Add spi priority factory (#11614)
b96d69701a is described below
commit b96d69701a08f25cf43313df80a96d23c35ee36e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Aug 24 19:51:47 2022 +0800
[Improvement-#11613] Add spi priority factory (#11614)
* Add spi priority factory
* Add doc
* Add override log
* Use lombok
* Add comment
---
docs/docs/en/contribute/backend/spi/alert.md | 4 +-
docs/docs/en/contribute/backend/spi/datasource.md | 2 +
docs/docs/en/contribute/backend/spi/task.md | 2 +
docs/docs/zh/contribute/backend/spi/alert.md | 2 +
docs/docs/zh/contribute/backend/spi/datasource.md | 2 +
docs/docs/zh/contribute/backend/spi/task.md | 2 +
.../alert/api/AlertChannelFactory.java | 9 ++-
.../dolphinscheduler/alert/AlertPluginManager.java | 34 ++++-----
.../api/plugin/DataSourcePluginManager.java | 16 ++--
.../server/master/MasterServer.java | 1 +
.../master/runner/task/BaseTaskProcessor.java | 6 ++
.../server/master/runner/task/ITaskProcessor.java | 3 +-
.../master/runner/task/TaskProcessorFactory.java | 27 +++----
.../service/task/TaskPluginManager.java | 32 ++++----
.../spi/datasource/DataSourceChannelFactory.java | 10 ++-
.../PrioritySPI.java} | 20 +++--
.../spi/plugin/PrioritySPIFactory.java | 60 +++++++++++++++
.../dolphinscheduler/spi/plugin/SPIIdentify.java | 19 ++++-
.../spi/plugin/PrioritySPIFactoryTest.java | 88 ++++++++++++++++++++++
.../plugin/task/api/TaskChannelFactory.java | 9 ++-
20 files changed, 271 insertions(+), 77 deletions(-)
diff --git a/docs/docs/en/contribute/backend/spi/alert.md b/docs/docs/en/contribute/backend/spi/alert.md
index e2629a87a8..9b6c45e547 100644
--- a/docs/docs/en/contribute/backend/spi/alert.md
+++ b/docs/docs/en/contribute/backend/spi/alert.md
@@ -6,7 +6,9 @@ DolphinScheduler is undergoing a microkernel + plug-in architecture change. All
For alarm-related codes, please refer to the `dolphinscheduler-alert-api` module. This module defines the extension interface of the alarm plug-in and some basic codes. When we need to realize the plug-inization of related functions, it is recommended to read the code of this block first. Of course, it is recommended that you read the document. This will reduce a lot of time, but the document There is a certain degree of lag. When the document is missing, it is recommended to take the so [...]
-We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple.
+We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple.
+
+In additional, the `AlertChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
By the way, we have adopted an excellent front-end component form-create, which supports the generation of front-end UI components based on JSON. If plug-in development involves the front-end, we will use JSON to generate related front-end UI components, org.apache.dolphinscheduler. The parameters of the plug-in are encapsulated in spi.params, which will convert all the relevant parameters into the corresponding JSON, which means that you can complete the drawing of the front-end compone [...]
diff --git a/docs/docs/en/contribute/backend/spi/datasource.md b/docs/docs/en/contribute/backend/spi/datasource.md
index 5772b4357c..9738e07330 100644
--- a/docs/docs/en/contribute/backend/spi/datasource.md
+++ b/docs/docs/en/contribute/backend/spi/datasource.md
@@ -18,6 +18,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
We provide APIs for external access of all data sources in the dolphin scheduler data source API module
+In additional, the `DataSourceChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
+
#### **Future plan**
Support data sources such as kafka, http, files, sparkSQL, FlinkSQL, etc.
\ No newline at end of file
diff --git a/docs/docs/en/contribute/backend/spi/task.md b/docs/docs/en/contribute/backend/spi/task.md
index 70b01d48ff..f909d42fa8 100644
--- a/docs/docs/en/contribute/backend/spi/task.md
+++ b/docs/docs/en/contribute/backend/spi/task.md
@@ -8,6 +8,8 @@ The plug-in can implement the above interface. It mainly includes creating tasks
We provide APIs for external access to all tasks in the dolphinscheduler-task-api module, while the dolphinscheduler-spi module is the spi general code library, which defines all the plug-in modules, such as the alarm module, the registry module, etc., you can read and view in detail .
+In additional, the `TaskChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
+
*NOTICE*
Since the task plug-in involves the front-end page, the front-end SPI has not yet been implemented, so you need to implement the front-end page corresponding to the plug-in separately.
diff --git a/docs/docs/zh/contribute/backend/spi/alert.md b/docs/docs/zh/contribute/backend/spi/alert.md
index 709802e782..21ea651967 100644
--- a/docs/docs/zh/contribute/backend/spi/alert.md
+++ b/docs/docs/zh/contribute/backend/spi/alert.md
@@ -8,6 +8,8 @@ DolphinScheduler 正在处于微内核 + 插件化的架构更改之中,所有
我们采用了原生的 JAVA-SPI,当你需要扩展的时候,事实上你只需要关注扩展`org.apache.dolphinscheduler.alert.api.AlertChannelFactory`接口即可,底层相关逻辑如插件加载等内核已经实现,这让我们的开发更加专注且简单。
+另外,`AlertChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。
+
顺便提一句,我们采用了一款优秀的前端组件 form-create,它支持基于 json 生成前端 ui 组件,如果插件开发牵扯到前端,我们会通过 json 来生成相关前端 UI 组件,org.apache.dolphinscheduler.spi.params 里面对插件的参数做了封装,它会将相关参数全部全部转化为对应的 json,这意味这你完全可以通过 Java 代码的方式完成前端组件的绘制(这里主要是表单,我们只关心前后端交互的数据)。
本文主要着重讲解 Alert 告警相关设计以及开发。
diff --git a/docs/docs/zh/contribute/backend/spi/datasource.md b/docs/docs/zh/contribute/backend/spi/datasource.md
index 1868c86d9e..a2fc4b59de 100644
--- a/docs/docs/zh/contribute/backend/spi/datasource.md
+++ b/docs/docs/zh/contribute/backend/spi/datasource.md
@@ -17,6 +17,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
我们在 dolphinscheduler-datasource-api 模块提供了所有数据源对外访问的 API
+另外,DataSourceChannelFactory 继承自PrioritySPI,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写getIdentify 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。
+
#### **未来计划**
支持kafka、http、文件、sparkSQL、FlinkSQL等数据源
diff --git a/docs/docs/zh/contribute/backend/spi/task.md b/docs/docs/zh/contribute/backend/spi/task.md
index b2ee5242b5..fb0fe88fbe 100644
--- a/docs/docs/zh/contribute/backend/spi/task.md
+++ b/docs/docs/zh/contribute/backend/spi/task.md
@@ -8,6 +8,8 @@ org.apache.dolphinscheduler.spi.task.TaskChannel
我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。
+另外,`TaskChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。
+
*NOTICE*
由于任务插件涉及到前端页面,目前前端的SPI还没有实现,因此你需要单独实现插件对应的前端页面。
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
index 2fa328e213..0f9878485c 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
@@ -20,15 +20,18 @@
package org.apache.dolphinscheduler.alert.api;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
import java.util.List;
/**
* alert channel factory
*/
-public interface AlertChannelFactory {
+public interface AlertChannelFactory extends PrioritySPI {
/**
* Returns the name of the alert channel
+ *
* @return the name of the alert channel
*/
String name();
@@ -44,4 +47,8 @@ public interface AlertChannelFactory {
* Returns the configurable parameters that this plugin needs to display on the web ui
*/
List<PluginParams> params();
+
+ default SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name(name()).build();
+ }
}
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
index cc84769304..782bf0aa85 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.alert;
-import static java.lang.String.format;
-
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertChannelFactory;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
@@ -31,21 +29,20 @@ import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.ServiceLoader;
-import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.context.event.ApplicationReadyEvent;
-import org.springframework.context.event.EventListener;
-import org.springframework.stereotype.Component;
+import static java.lang.String.format;
@Component
public final class AlertPluginManager {
@@ -74,20 +71,17 @@ public final class AlertPluginManager {
@EventListener
public void installPlugin(ApplicationReadyEvent readyEvent) {
- final Set<String> names = new HashSet<>();
-
- ServiceLoader.load(AlertChannelFactory.class).forEach(factory -> {
- final String name = factory.name();
- logger.info("Registering alert plugin: {}", name);
+ PrioritySPIFactory<AlertChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class);
+ for (Map.Entry<String, AlertChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
+ String name = entry.getKey();
+ AlertChannelFactory factory = entry.getValue();
- if (!names.add(name)) {
- throw new IllegalStateException(format("Duplicate alert plugins named '%s'", name));
- }
+ logger.info("Registering alert plugin: {} - {}", name, factory.getClass());
final AlertChannel alertChannel = factory.create();
- logger.info("Registered alert plugin: {}", name);
+ logger.info("Registered alert plugin: {} - {}", name, factory.getClass());
final List<PluginParams> params = new ArrayList<>(factory.params());
params.add(0, warningTypeParams);
@@ -98,7 +92,7 @@ public final class AlertPluginManager {
final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
channelKeyedById.put(id, alertChannel);
- });
+ }
}
public Optional<AlertChannel> getAlertChannel(int id) {
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
index fd31579a7c..0610ed6d7b 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
@@ -17,18 +17,18 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
-import static java.lang.String.format;
-
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.lang.String.format;
public class DataSourcePluginManager {
private static final Logger logger = LoggerFactory.getLogger(DataSourcePluginManager.class);
@@ -41,8 +41,10 @@ public class DataSourcePluginManager {
public void installPlugin() {
- ServiceLoader.load(DataSourceChannelFactory.class).forEach(factory -> {
- final String name = factory.getName();
+ PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(DataSourceChannelFactory.class);
+ for (Map.Entry<String, DataSourceChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
+ final DataSourceChannelFactory factory = entry.getValue();
+ final String name = entry.getKey();
logger.info("Registering datasource plugin: {}", name);
@@ -53,7 +55,7 @@ public class DataSourcePluginManager {
loadDatasourceClient(factory);
logger.info("Registered datasource plugin: {}", name);
- });
+ }
}
private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6419a97365..06d2c3ceb0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.quartz.SchedulerException;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 7b5f7c66f7..8651dc018a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -261,6 +262,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
throw new UnsupportedOperationException("This abstract class doesn's has type");
}
+ @Override
+ public SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name(getType()).build();
+ }
+
@Override
public TaskInstance taskInstance() {
return this.taskInstance;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index d7d241b7f6..de0a0a7c23 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -19,11 +19,12 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
/**
* interface of task processor in master
*/
-public interface ITaskProcessor {
+public interface ITaskProcessor extends PrioritySPI {
void init(TaskInstance taskInstance, ProcessInstance processInstance);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 9595c46dd8..f585c94bbf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -17,24 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_STREAM;
-
+import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.StringUtils;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
-import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.experimental.UtilityClass;
-
-import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
/**
* the factory to create task processor
@@ -44,16 +38,19 @@ public final class TaskProcessorFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
- public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
+ private static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
- for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
+ PrioritySPIFactory<ITaskProcessor> prioritySPIFactory = new PrioritySPIFactory<>(ITaskProcessor.class);
+ for (Map.Entry<String, ITaskProcessor> entry : prioritySPIFactory.getSPIMap().entrySet()) {
try {
- PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
+ logger.info("Registering task processor: {} - {}", entry.getKey(), entry.getValue().getClass());
+ PROCESS_MAP.put(entry.getKey(), (Constructor<ITaskProcessor>) entry.getValue().getClass().getConstructor());
+ logger.info("Registered task processor: {} - {}", entry.getKey(), entry.getValue().getClass());
} catch (NoSuchMethodException e) {
- throw new IllegalArgumentException("The task processor should has a no args constructor", e);
+ throw new IllegalArgumentException(String.format("The task processor: %s should has a no args constructor", entry.getKey()));
}
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
index 1289b5718a..e3776e09ba 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
@@ -17,25 +17,21 @@
package org.apache.dolphinscheduler.service.task;
-import static java.lang.String.format;
-
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
@Component
public class TaskPluginManager {
private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class);
@@ -53,19 +49,19 @@ public class TaskPluginManager {
logger.warn("The task plugin has already been loaded");
return;
}
- ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
- final String name = factory.getName();
+ PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
+ for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
+ String factoryName = entry.getKey();
+ TaskChannelFactory factory = entry.getValue();
- logger.info("Registering task plugin: {}", name);
+ logger.info("Registering task plugin: {} - {}", factoryName, factory.getClass());
- if (taskChannelFactoryMap.containsKey(name)) {
- throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
- }
- taskChannelFactoryMap.put(name, factory);
- taskChannelMap.put(name, factory.create());
+ taskChannelFactoryMap.put(factoryName, factory);
+ taskChannelMap.put(factoryName, factory.create());
+
+ logger.info("Registered task plugin: {} - {}", factoryName, factory.getClass());
+ }
- logger.info("Registered task plugin: {}", name);
- });
}
public Map<String, TaskChannel> getTaskChannelMap() {
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
index c947c3a647..eceb39fda9 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
@@ -17,7 +17,10 @@
package org.apache.dolphinscheduler.spi.datasource;
-public interface DataSourceChannelFactory {
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
+
+public interface DataSourceChannelFactory extends PrioritySPI {
/**
* get datasource client
*/
@@ -27,4 +30,9 @@ public interface DataSourceChannelFactory {
* get registry component name
*/
String getName();
+
+ @Override
+ default SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name(getName()).build();
+ }
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
similarity index 63%
copy from dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
copy to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
index c947c3a647..094b39ce64 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
@@ -15,16 +15,20 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.spi.datasource;
+package org.apache.dolphinscheduler.spi.plugin;
-public interface DataSourceChannelFactory {
- /**
- * get datasource client
- */
- DataSourceChannel create();
+public interface PrioritySPI extends Comparable<Integer> {
/**
- * get registry component name
+ * The SPI identify, if the two plugin has the same name, will load the high priority.
+ * If the priority and name is all same, will throw <code>IllegalArgumentException</code>
+ * @return
*/
- String getName();
+ SPIIdentify getIdentify();
+
+ @Override
+ default int compareTo(Integer o) {
+ return Integer.compare(getIdentify().getPriority(), o);
+ }
+
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java
new file mode 100644
index 0000000000..8b1921b700
--- /dev/null
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.plugin;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+@Slf4j
+public class PrioritySPIFactory<T extends PrioritySPI> {
+
+ private final Map<String, T> map = new HashMap<>();
+
+ public PrioritySPIFactory(Class<T> spiClass) {
+ for (T t : ServiceLoader.load(spiClass)) {
+ if (map.containsKey(t.getIdentify().getName())) {
+ resolveConflict(t);
+ } else {
+ map.put(t.getIdentify().getName(), t);
+ }
+ }
+ }
+
+ public Map<String, T> getSPIMap() {
+ return Collections.unmodifiableMap(map);
+ }
+
+ private void resolveConflict(T newSPI) {
+ SPIIdentify identify = newSPI.getIdentify();
+ T oldSPI = map.get(identify.getName());
+
+ if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) == 0) {
+ throw new IllegalArgumentException(String.format("These two spi plugins has conflict identify name with the same priority: %s, %s",
+ oldSPI.getIdentify(), newSPI.getIdentify()));
+ } else if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) > 0) {
+ log.info("The {} plugin has high priority, will override {}", newSPI.getIdentify(), oldSPI);
+ map.put(identify.getName(), newSPI);
+ } else {
+ log.info("The low plugin {} will be skipped", newSPI);
+ }
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
similarity index 70%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
copy to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
index 643c4cdb70..e55de1c466 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
@@ -15,11 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.api;
+package org.apache.dolphinscheduler.spi.plugin;
-import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
-public interface TaskChannelFactory extends UiChannelFactory {
+@Data
+@Builder
+@AllArgsConstructor
+public class SPIIdentify {
+
+ private static final int DEFAULT_PRIORITY = 0;
+
+ private String name;
+
+ @Builder.Default
+ private int priority = DEFAULT_PRIORITY;
- TaskChannel create();
}
diff --git a/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java b/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java
new file mode 100644
index 0000000000..4ed3519a6d
--- /dev/null
+++ b/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.plugin;
+
+import com.google.auto.service.AutoService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class PrioritySPIFactoryTest {
+
+ @Test
+ public void loadHighPriority() {
+ PrioritySPIFactory<LoadHighPriorityConflictTestSPI> factory = new PrioritySPIFactory<>(LoadHighPriorityConflictTestSPI.class);
+ Map<String, LoadHighPriorityConflictTestSPI> spiMap = factory.getSPIMap();
+ Assert.assertEquals(1, spiMap.get("A").getIdentify().getPriority());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwExceptionWhenPriorityIsSame() {
+ PrioritySPIFactory<ThrowExceptionConflictTestSPI> factory = new PrioritySPIFactory<>(ThrowExceptionConflictTestSPI.class);
+ Map<String, ThrowExceptionConflictTestSPI> spiMap = factory.getSPIMap();
+ Assert.assertEquals(0, spiMap.get("B").getIdentify().getPriority());
+ }
+
+
+ public interface LoadHighPriorityConflictTestSPI extends PrioritySPI {
+
+ }
+
+ @AutoService(LoadHighPriorityConflictTestSPI.class)
+ public static class SPIA implements LoadHighPriorityConflictTestSPI {
+
+ @Override
+ public SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name("A").priority(0).build();
+ }
+ }
+
+ @AutoService(LoadHighPriorityConflictTestSPI.class)
+ public static class SPIAA implements LoadHighPriorityConflictTestSPI {
+
+ @Override
+ public SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name("A").priority(1).build();
+ }
+ }
+
+ public interface ThrowExceptionConflictTestSPI extends PrioritySPI {
+
+ }
+
+ @AutoService(ThrowExceptionConflictTestSPI.class)
+ public static class SPIB implements ThrowExceptionConflictTestSPI {
+
+ @Override
+ public SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name("B").priority(0).build();
+ }
+ }
+
+ @AutoService(ThrowExceptionConflictTestSPI.class)
+ public static class SPIBB implements ThrowExceptionConflictTestSPI {
+
+ @Override
+ public SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name("B").priority(0).build();
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
index 643c4cdb70..51bf1957e4 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java
@@ -18,8 +18,15 @@
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
+import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
+import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
-public interface TaskChannelFactory extends UiChannelFactory {
+public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI {
TaskChannel create();
+
+ default SPIIdentify getIdentify() {
+ return SPIIdentify.builder().name(getName()).build();
+ }
+
}