You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/20 09:59:05 UTC

[dolphinscheduler] branch dev updated: [Fix-7146][server]Fix the task processor thread is not safe. (#7488)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b23f756  [Fix-7146][server]Fix the task processor thread is not safe. (#7488)
b23f756 is described below

commit b23f756c229ffa17508780f63f8e70f471ec3264
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Dec 20 17:58:57 2021 +0800

    [Fix-7146][server]Fix the task processor thread is not safe. (#7488)
    
    * Fix the task processor thread is not safe.
    
    * fix code style
---
 .../master/runner/MasterSchedulerService.java      |  9 +-----
 .../master/runner/WorkflowExecuteThread.java       | 12 ++------
 .../master/runner/task/BaseTaskProcessor.java      |  5 ++--
 .../runner/task/CommonTaskProcessFactory.java}     | 27 +++++++----------
 .../master/runner/task/CommonTaskProcessor.java    |  8 +----
 .../runner/task/ConditionTaskProcessFactory.java}  | 28 +++++++-----------
 .../master/runner/task/ConditionTaskProcessor.java |  8 ++---
 .../runner/task/DependentTaskProcessFactory.java}  | 27 +++++++----------
 .../master/runner/task/DependentTaskProcessor.java |  8 ++---
 .../master/runner/task/ITaskProcessFactory.java}   | 22 ++------------
 .../master/runner/task/SubTaskProcessFactory.java} | 28 +++++++-----------
 .../master/runner/task/SubTaskProcessor.java       | 10 ++-----
 .../runner/task/SwitchTaskProcessFactory.java}     | 27 +++++++----------
 .../master/runner/task/SwitchTaskProcessor.java    |  8 +++--
 .../master/runner/task/TaskProcessorFactory.java   | 34 ++++++++++------------
 .../server/master/WorkflowExecuteThreadTest.java   |  2 +-
 .../runner/task/TaskProcessorFactoryTest.java      |  3 +-
 17 files changed, 93 insertions(+), 173 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index f3cdb4b..ff30f28 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -65,12 +65,6 @@ public class MasterSchedulerService extends Thread {
     private ProcessService processService;
 
     /**
-     * task processor factory
-     */
-    @Autowired
-    private TaskProcessorFactory taskProcessorFactory;
-
-    /**
      * master config
      */
     @Autowired
@@ -176,8 +170,7 @@ public class MasterSchedulerService extends Thread {
                     , nettyExecutorManager
                     , processAlertManager
                     , masterConfig
-                    , stateWheelExecuteThread
-                    , taskProcessorFactory);
+                    , stateWheelExecuteThread);
 
             this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
             if (processInstance.getTimeout() > 0) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index c20ddf7..9c5eee7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -134,11 +134,6 @@ public class WorkflowExecuteThread {
     private ProcessDefinition processDefinition;
 
     /**
-     * task processor
-     */
-    private TaskProcessorFactory taskProcessorFactory;
-
-    /**
      * the object of DAG
      */
     private DAG<String, TaskNode, TaskNodeRelation> dag;
@@ -227,22 +222,19 @@ public class WorkflowExecuteThread {
      * @param processAlertManager processAlertManager
      * @param masterConfig masterConfig
      * @param stateWheelExecuteThread stateWheelExecuteThread
-     * @param taskProcessorFactory taskProcessorFactory
      */
     public WorkflowExecuteThread(ProcessInstance processInstance
             , ProcessService processService
             , NettyExecutorManager nettyExecutorManager
             , ProcessAlertManager processAlertManager
             , MasterConfig masterConfig
-            , StateWheelExecuteThread stateWheelExecuteThread
-            , TaskProcessorFactory taskProcessorFactory) {
+            , StateWheelExecuteThread stateWheelExecuteThread) {
         this.processService = processService;
         this.processInstance = processInstance;
         this.masterConfig = masterConfig;
         this.nettyExecutorManager = nettyExecutorManager;
         this.processAlertManager = processAlertManager;
         this.stateWheelExecuteThread = stateWheelExecuteThread;
-        this.taskProcessorFactory = taskProcessorFactory;
     }
 
     /**
@@ -805,7 +797,7 @@ public class WorkflowExecuteThread {
      */
     private TaskInstance submitTaskExec(TaskInstance taskInstance) {
         try {
-            ITaskProcessor taskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+            ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
             if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
                     && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index ceefa26..0b3d96b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -61,7 +62,6 @@ import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Strings;
@@ -80,8 +80,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected ProcessInstance processInstance;
 
-    @Autowired
-    protected ProcessService processService;
+    protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);;
 
     /**
      * pause task, common tasks donot need this.
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
index ffef87c..4884650 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
@@ -17,25 +17,20 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.Constants;
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
 
-@Ignore
-public class TaskProcessorFactoryTest {
+@AutoService(ITaskProcessFactory.class)
+public class CommonTaskProcessFactory implements ITaskProcessFactory {
+    @Override
+    public String type() {
+        return Constants.COMMON_TASK_TYPE;
 
-    private TaskProcessorFactory taskProcessorFactory;
-    @Test
-    public void testFactory() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskType("shell");
-
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
-        Assert.assertNotNull(iTaskProcessor);
     }
 
+    @Override
+    public ITaskProcessor create() {
+        return new CommonTaskProcessor();
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index f7053ff..f35127a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -38,20 +38,14 @@ import org.apache.commons.lang.StringUtils;
 
 import java.util.Date;
 
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
 /**
  * common task processor
  */
-@Service
 public class CommonTaskProcessor extends BaseTaskProcessor {
 
-    @Autowired
     private TaskPriorityQueue taskUpdateQueue;
 
-    @Autowired
-    NettyExecutorManager nettyExecutorManager;
+    private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
 
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
index ffef87c..3028c56 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
@@ -17,25 +17,19 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
 
-@Ignore
-public class TaskProcessorFactoryTest {
-
-    private TaskProcessorFactory taskProcessorFactory;
-    @Test
-    public void testFactory() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskType("shell");
-
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
-        Assert.assertNotNull(iTaskProcessor);
+@AutoService(ITaskProcessFactory.class)
+public class ConditionTaskProcessFactory implements ITaskProcessFactory {
+    @Override
+    public String type() {
+        return TaskType.CONDITIONS.getDesc();
     }
 
+    @Override
+    public ITaskProcessor create() {
+        return new ConditionTaskProcessor();
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index bf2390a..594fa29 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -39,13 +40,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
 /**
  * condition task processor
  */
-@Service
 public class ConditionTaskProcessor extends BaseTaskProcessor {
 
     /**
@@ -65,8 +62,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
      */
     private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
 
-    @Autowired
-    private MasterConfig masterConfig;
+    private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
 
     private TaskDefinition taskDefinition;
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
index ffef87c..3f885ed 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
@@ -17,25 +17,20 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
 
-@Ignore
-public class TaskProcessorFactoryTest {
+@AutoService(ITaskProcessFactory.class)
+public class DependentTaskProcessFactory implements ITaskProcessFactory {
 
-    private TaskProcessorFactory taskProcessorFactory;
-    @Test
-    public void testFactory() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskType("shell");
-
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
-        Assert.assertNotNull(iTaskProcessor);
+    @Override
+    public String type() {
+        return TaskType.DEPENDENT.getDesc();
     }
 
+    @Override
+    public ITaskProcessor create() {
+        return new DependentTaskProcessor();
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 6d311ee..922ecca 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -40,15 +41,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
 import com.fasterxml.jackson.annotation.JsonFormat;
 
 /**
  * dependent task processor
  */
-@Service
 public class DependentTaskProcessor extends BaseTaskProcessor {
 
     private DependentParameters dependentParameters;
@@ -75,8 +72,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    @Autowired
-    private MasterConfig masterConfig;
+    private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
 
     boolean allDependentItemFinished;
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
similarity index 61%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
index ffef87c..ffbbafb 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
@@ -17,25 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+public interface ITaskProcessFactory {
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore
-public class TaskProcessorFactoryTest {
-
-    private TaskProcessorFactory taskProcessorFactory;
-    @Test
-    public void testFactory() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskType("shell");
-
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
-        Assert.assertNotNull(iTaskProcessor);
-    }
+    String type();
 
+    ITaskProcessor create();
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
index ffef87c..439d8e1 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
@@ -17,25 +17,19 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
 
-@Ignore
-public class TaskProcessorFactoryTest {
-
-    private TaskProcessorFactory taskProcessorFactory;
-    @Test
-    public void testFactory() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskType("shell");
-
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
-        Assert.assertNotNull(iTaskProcessor);
+@AutoService(ITaskProcessFactory.class)
+public class SubTaskProcessFactory implements ITaskProcessFactory {
+    @Override
+    public String type() {
+        return TaskType.SUB_PROCESS.getDesc();
     }
 
+    @Override
+    public ITaskProcessor create() {
+        return new SubTaskProcessor();
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 0f8ccbc..ed47063 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -25,18 +25,15 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
 /**
- *
+ * subtask processor
  */
-@Service
 public class SubTaskProcessor extends BaseTaskProcessor {
 
     private ProcessInstance processInstance;
@@ -49,8 +46,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
      */
     private final Lock runLock = new ReentrantLock();
 
-    @Autowired
-    private StateEventCallbackService stateEventCallbackService;
+    private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);;
 
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
similarity index 62%
copy from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
index ffef87c..d536e65 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
@@ -17,25 +17,20 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import com.google.auto.service.AutoService;
 
-@Ignore
-public class TaskProcessorFactoryTest {
+@AutoService(ITaskProcessFactory.class)
+public class SwitchTaskProcessFactory implements ITaskProcessFactory {
 
-    private TaskProcessorFactory taskProcessorFactory;
-    @Test
-    public void testFactory() {
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setTaskType("shell");
-
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
-
-        Assert.assertNotNull(iTaskProcessor);
+    @Override
+    public String type() {
+        return TaskType.SWITCH.getDesc();
     }
 
+    @Override
+    public ITaskProcessor create() {
+        return new SwitchTaskProcessor();
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index a4259ce..e162613 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
@@ -46,7 +47,9 @@ import java.util.stream.Collectors;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-@Service
+/**
+ * switch task processor
+ */
 public class SwitchTaskProcessor extends BaseTaskProcessor {
 
     protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
@@ -56,8 +59,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     private ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    @Autowired
-    private MasterConfig masterConfig;
+    private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
 
     /**
      * switch result
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 09e8bf2..4b20848 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -21,39 +21,35 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
 
 import org.apache.commons.lang3.StringUtils;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * the factory to create task processor
  */
-@Service
 public class TaskProcessorFactory {
 
-    private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
+    public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>();
 
-    private Map<String, ITaskProcessor> taskProcessorMap;
+    private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
 
-    @Autowired
-    public TaskProcessorFactory(List<ITaskProcessor> taskProcessors) {
-        taskProcessorMap = taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, Function.identity(), (v1, v2) -> v2));
+    static {
+        for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) {
+            PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
+        }
     }
 
-    public ITaskProcessor getTaskProcessor(String key) {
-        if (StringUtils.isEmpty(key)) {
-            key = DEFAULT_PROCESSOR;
+    public static ITaskProcessor getTaskProcessor(String type) {
+        if (StringUtils.isEmpty(type)) {
+            type = DEFAULT_PROCESSOR;
         }
-        ITaskProcessor taskProcessor = taskProcessorMap.get(key);
-        if (Objects.isNull(taskProcessor)) {
-            taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR);
+        ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type);
+        if (Objects.isNull(taskProcessFactory)) {
+            taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR);
         }
 
-        return taskProcessor;
+        return taskProcessFactory.create();
     }
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 8db1bd9..8f2572d 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -111,7 +111,7 @@ public class WorkflowExecuteThreadTest {
         Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
 
         stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
-        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory));
+        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread));
         // prepareProcess init dag
         Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index ffef87c..4114a7a 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -26,14 +26,13 @@ import org.junit.Test;
 @Ignore
 public class TaskProcessorFactoryTest {
 
-    private TaskProcessorFactory taskProcessorFactory;
     @Test
     public void testFactory() {
 
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setTaskType("shell");
 
-        ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+        ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
 
         Assert.assertNotNull(iTaskProcessor);
     }