You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/20 09:59:05 UTC
[dolphinscheduler] branch dev updated: [Fix-7146][server]Fix the task processor thread is not safe. (#7488)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b23f756 [Fix-7146][server]Fix the task processor thread is not safe. (#7488)
b23f756 is described below
commit b23f756c229ffa17508780f63f8e70f471ec3264
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Dec 20 17:58:57 2021 +0800
[Fix-7146][server]Fix the task processor thread is not safe. (#7488)
* Fix the task processor thread is not safe.
* fix code style
---
.../master/runner/MasterSchedulerService.java | 9 +-----
.../master/runner/WorkflowExecuteThread.java | 12 ++------
.../master/runner/task/BaseTaskProcessor.java | 5 ++--
.../runner/task/CommonTaskProcessFactory.java} | 27 +++++++----------
.../master/runner/task/CommonTaskProcessor.java | 8 +----
.../runner/task/ConditionTaskProcessFactory.java} | 28 +++++++-----------
.../master/runner/task/ConditionTaskProcessor.java | 8 ++---
.../runner/task/DependentTaskProcessFactory.java} | 27 +++++++----------
.../master/runner/task/DependentTaskProcessor.java | 8 ++---
.../master/runner/task/ITaskProcessFactory.java} | 22 ++------------
.../master/runner/task/SubTaskProcessFactory.java} | 28 +++++++-----------
.../master/runner/task/SubTaskProcessor.java | 10 ++-----
.../runner/task/SwitchTaskProcessFactory.java} | 27 +++++++----------
.../master/runner/task/SwitchTaskProcessor.java | 8 +++--
.../master/runner/task/TaskProcessorFactory.java | 34 ++++++++++------------
.../server/master/WorkflowExecuteThreadTest.java | 2 +-
.../runner/task/TaskProcessorFactoryTest.java | 3 +-
17 files changed, 93 insertions(+), 173 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index f3cdb4b..ff30f28 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -65,12 +65,6 @@ public class MasterSchedulerService extends Thread {
private ProcessService processService;
/**
- * task processor factory
- */
- @Autowired
- private TaskProcessorFactory taskProcessorFactory;
-
- /**
* master config
*/
@Autowired
@@ -176,8 +170,7 @@ public class MasterSchedulerService extends Thread {
, nettyExecutorManager
, processAlertManager
, masterConfig
- , stateWheelExecuteThread
- , taskProcessorFactory);
+ , stateWheelExecuteThread);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index c20ddf7..9c5eee7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -134,11 +134,6 @@ public class WorkflowExecuteThread {
private ProcessDefinition processDefinition;
/**
- * task processor
- */
- private TaskProcessorFactory taskProcessorFactory;
-
- /**
* the object of DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> dag;
@@ -227,22 +222,19 @@ public class WorkflowExecuteThread {
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
- * @param taskProcessorFactory taskProcessorFactory
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
- , StateWheelExecuteThread stateWheelExecuteThread
- , TaskProcessorFactory taskProcessorFactory) {
+ , StateWheelExecuteThread stateWheelExecuteThread) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
- this.taskProcessorFactory = taskProcessorFactory;
}
/**
@@ -805,7 +797,7 @@ public class WorkflowExecuteThread {
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
try {
- ITaskProcessor taskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+ ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index ceefa26..0b3d96b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -61,7 +62,6 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.base.Enums;
import com.google.common.base.Strings;
@@ -80,8 +80,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessInstance processInstance;
- @Autowired
- protected ProcessService processService;
+ protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);;
/**
* pause task, common tasks donot need this.
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
index ffef87c..4884650 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
@@ -17,25 +17,20 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.Constants;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
-@Ignore
-public class TaskProcessorFactoryTest {
+@AutoService(ITaskProcessFactory.class)
+public class CommonTaskProcessFactory implements ITaskProcessFactory {
+ @Override
+ public String type() {
+ return Constants.COMMON_TASK_TYPE;
- private TaskProcessorFactory taskProcessorFactory;
- @Test
- public void testFactory() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskType("shell");
-
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
- Assert.assertNotNull(iTaskProcessor);
}
+ @Override
+ public ITaskProcessor create() {
+ return new CommonTaskProcessor();
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index f7053ff..f35127a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -38,20 +38,14 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
/**
* common task processor
*/
-@Service
public class CommonTaskProcessor extends BaseTaskProcessor {
- @Autowired
private TaskPriorityQueue taskUpdateQueue;
- @Autowired
- NettyExecutorManager nettyExecutorManager;
+ private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
index ffef87c..3028c56 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
@@ -17,25 +17,19 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
-@Ignore
-public class TaskProcessorFactoryTest {
-
- private TaskProcessorFactory taskProcessorFactory;
- @Test
- public void testFactory() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskType("shell");
-
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
- Assert.assertNotNull(iTaskProcessor);
+@AutoService(ITaskProcessFactory.class)
+public class ConditionTaskProcessFactory implements ITaskProcessFactory {
+ @Override
+ public String type() {
+ return TaskType.CONDITIONS.getDesc();
}
+ @Override
+ public ITaskProcessor create() {
+ return new ConditionTaskProcessor();
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index bf2390a..594fa29 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -39,13 +40,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
/**
* condition task processor
*/
-@Service
public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
@@ -65,8 +62,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
- @Autowired
- private MasterConfig masterConfig;
+ private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
private TaskDefinition taskDefinition;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
index ffef87c..3f885ed 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
@@ -17,25 +17,20 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
-@Ignore
-public class TaskProcessorFactoryTest {
+@AutoService(ITaskProcessFactory.class)
+public class DependentTaskProcessFactory implements ITaskProcessFactory {
- private TaskProcessorFactory taskProcessorFactory;
- @Test
- public void testFactory() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskType("shell");
-
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
- Assert.assertNotNull(iTaskProcessor);
+ @Override
+ public String type() {
+ return TaskType.DEPENDENT.getDesc();
}
+ @Override
+ public ITaskProcessor create() {
+ return new DependentTaskProcessor();
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 6d311ee..922ecca 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -40,15 +41,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* dependent task processor
*/
-@Service
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
@@ -75,8 +72,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
ProcessInstance processInstance;
TaskDefinition taskDefinition;
- @Autowired
- private MasterConfig masterConfig;
+ private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
boolean allDependentItemFinished;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
similarity index 61%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
index ffef87c..ffbbafb 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
@@ -17,25 +17,9 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+public interface ITaskProcessFactory {
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore
-public class TaskProcessorFactoryTest {
-
- private TaskProcessorFactory taskProcessorFactory;
- @Test
- public void testFactory() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskType("shell");
-
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
- Assert.assertNotNull(iTaskProcessor);
- }
+ String type();
+ ITaskProcessor create();
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
index ffef87c..439d8e1 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
@@ -17,25 +17,19 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
-@Ignore
-public class TaskProcessorFactoryTest {
-
- private TaskProcessorFactory taskProcessorFactory;
- @Test
- public void testFactory() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskType("shell");
-
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
- Assert.assertNotNull(iTaskProcessor);
+@AutoService(ITaskProcessFactory.class)
+public class SubTaskProcessFactory implements ITaskProcessFactory {
+ @Override
+ public String type() {
+ return TaskType.SUB_PROCESS.getDesc();
}
+ @Override
+ public ITaskProcessor create() {
+ return new SubTaskProcessor();
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 0f8ccbc..ed47063 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -25,18 +25,15 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
/**
- *
+ * subtask processor
*/
-@Service
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
@@ -49,8 +46,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
- @Autowired
- private StateEventCallbackService stateEventCallbackService;
+ private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
index ffef87c..d536e65 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
@@ -17,25 +17,20 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
-@Ignore
-public class TaskProcessorFactoryTest {
+@AutoService(ITaskProcessFactory.class)
+public class SwitchTaskProcessFactory implements ITaskProcessFactory {
- private TaskProcessorFactory taskProcessorFactory;
- @Test
- public void testFactory() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setTaskType("shell");
-
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
- Assert.assertNotNull(iTaskProcessor);
+ @Override
+ public String type() {
+ return TaskType.SWITCH.getDesc();
}
+ @Override
+ public ITaskProcessor create() {
+ return new SwitchTaskProcessor();
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index a4259ce..e162613 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@@ -46,7 +47,9 @@ import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-@Service
+/**
+ * switch task processor
+ */
public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
@@ -56,8 +59,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
- @Autowired
- private MasterConfig masterConfig;
+ private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
/**
* switch result
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 09e8bf2..4b20848 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -21,39 +21,35 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
/**
* the factory to create task processor
*/
-@Service
public class TaskProcessorFactory {
- private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
+ public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>();
- private Map<String, ITaskProcessor> taskProcessorMap;
+ private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
- @Autowired
- public TaskProcessorFactory(List<ITaskProcessor> taskProcessors) {
- taskProcessorMap = taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, Function.identity(), (v1, v2) -> v2));
+ static {
+ for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) {
+ PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
+ }
}
- public ITaskProcessor getTaskProcessor(String key) {
- if (StringUtils.isEmpty(key)) {
- key = DEFAULT_PROCESSOR;
+ public static ITaskProcessor getTaskProcessor(String type) {
+ if (StringUtils.isEmpty(type)) {
+ type = DEFAULT_PROCESSOR;
}
- ITaskProcessor taskProcessor = taskProcessorMap.get(key);
- if (Objects.isNull(taskProcessor)) {
- taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR);
+ ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type);
+ if (Objects.isNull(taskProcessFactory)) {
+ taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR);
}
- return taskProcessor;
+ return taskProcessFactory.create();
}
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 8db1bd9..8f2572d 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -111,7 +111,7 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
- workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory));
+ workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index ffef87c..4114a7a 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -26,14 +26,13 @@ import org.junit.Test;
@Ignore
public class TaskProcessorFactoryTest {
- private TaskProcessorFactory taskProcessorFactory;
@Test
public void testFactory() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");
- ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+ ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
Assert.assertNotNull(iTaskProcessor);
}