You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/28 14:34:53 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: 1,add UT in pom 2,refactor TaskUpdateQueue (#2326)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 68cb81f  1,add UT in pom 2,refactor TaskUpdateQueue (#2326)
68cb81f is described below

commit 68cb81fdf5e62c95444f22837545c3c3b8604ba3
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Sat Mar 28 22:34:47 2020 +0800

    1,add UT in pom 2,refactor TaskUpdateQueue (#2326)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    * TaskExecutionContextBuilder set TaskInstance workgroup
    
    * WorkerGroupService queryAllGroup modify
    query available work group
    
    * 1,get workergroup from zk modify
    2,SpringConnectionFactory repeat load modify
    
    * master and worker register ip  use OSUtils.getHost()
    
    * ProcessInstance host set ip:port format
    
    * worker fault tolerance modify
    
    * Constants and .env modify
    
    * master fault tolerant bug modify
    
    * UT add pom.xml
    
    * timing online  modify
    
    * when taskResponse is faster than taskAck to db,task state will error
    add async queue and new a thread reslove this problem
    
    * TaskExecutionContext set host
    
    * 1,TaskManager refactor
    2, api start load server dolphinschedule-daemon.sh modify
    
    * 1,TaskManager refactor
    2, api start load server dolphinschedule-daemon.sh modify
    
    * add UT in pom.xml
    
    * revert dolphinscheduler-daemon.sh
    
    * ZookeeperRegister use common.properties zookeeperRoot path
    
    * api start exclude org.apache.dolphinscheduler.server.*
    
    * ZookeeperRegister use common.properties zookeeperRoot path
    
    * 1,api start load server filter
    2,SHELL task exitStatusCode modify
    
    * java doc error modify
    
    * java doc error modify
    
    * remove todo
    
    * add UT in pom
    
    * 1,add UT in pom
    2,refactor TaskUpdateQueue
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 .../api/controller/ProcessInstanceController.java  |   1 -
 .../api/service/DataAnalysisServiceTest.java       |   2 -
 ...onsumer.java => TaskPriorityQueueConsumer.java} | 107 +++++++++++++--------
 .../master/runner/MasterBaseTaskExecThread.java    |   8 +-
 ...TaskUpdateQueue.java => TaskPriorityQueue.java} |   2 +-
 ...teQueueImpl.java => TaskPriorityQueueImpl.java} |   4 +-
 .../service/queue/TaskQueueFactory.java            |  55 -----------
 .../src/test/java/queue/TaskUpdateQueueTest.java   |   6 +-
 pom.xml                                            |  23 ++---
 9 files changed, 85 insertions(+), 123 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index b6533ad..2fd332f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
index 10220e2..35cc6ae 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,7 +45,6 @@ import java.util.List;
 import java.util.Map;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskQueueFactory.class})
 public class DataAnalysisServiceTest {
     
     @InjectMocks
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
similarity index 69%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 077b1ef..4aaf901 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,18 +53,18 @@ import java.util.List;
  * TaskUpdateQueue consumer
  */
 @Component
-public class TaskUpdateQueueConsumer extends Thread{
+public class TaskPriorityQueueConsumer extends Thread{
 
     /**
      * logger of TaskUpdateQueueConsumer
      */
-    private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
+    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
 
     /**
      * taskUpdateQueue
      */
     @Autowired
-    private TaskUpdateQueue taskUpdateQueue;
+    private TaskPriorityQueue taskUpdateQueue;
 
     /**
      * processService
@@ -155,52 +155,19 @@ public class TaskUpdateQueueConsumer extends Thread{
         TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
         // SQL task
         if (taskType == TaskType.SQL){
-            SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
-            int datasourceId = sqlParameters.getDatasource();
-            DataSource datasource = processService.findDataSourceById(datasourceId);
-            sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
-
-            // whether udf type
-            boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
-                    && StringUtils.isNotEmpty(sqlParameters.getUdfs());
-
-            if (udfTypeFlag){
-                String[] udfFunIds = sqlParameters.getUdfs().split(",");
-                int[] udfFunIdsArray = new int[udfFunIds.length];
-                for(int i = 0 ; i < udfFunIds.length;i++){
-                    udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
-                }
-
-                List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
-                sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
-            }
+            setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
 
         }
 
         // DATAX task
         if (taskType == TaskType.DATAX){
-            DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
-
-            DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
-            DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
-
-
-            dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
-            dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-            dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
-
-            dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
-            dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-            dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
         }
 
 
         // procedure task
         if (taskType == TaskType.PROCEDURE){
-            ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
-            int datasourceId = procedureParameters.getDatasource();
-            DataSource datasource = processService.findDataSourceById(datasourceId);
-            procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+            setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
         }
 
 
@@ -216,6 +183,66 @@ public class TaskUpdateQueueConsumer extends Thread{
     }
 
     /**
+     * set procedure task relation
+     * @param procedureTaskExecutionContext procedureTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) {
+        ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
+        int datasourceId = procedureParameters.getDatasource();
+        DataSource datasource = processService.findDataSourceById(datasourceId);
+        procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+    }
+
+    /**
+     * set datax task relation
+     * @param dataxTaskExecutionContext dataxTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
+        DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+
+        DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
+        DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+
+
+        dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+        dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+        dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+
+        dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+        dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+        dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+    }
+
+    /**
+     * set SQL task relation
+     * @param sqlTaskExecutionContext sqlTaskExecutionContext
+     * @param taskNode taskNode
+     */
+    private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) {
+        SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
+        int datasourceId = sqlParameters.getDatasource();
+        DataSource datasource = processService.findDataSourceById(datasourceId);
+        sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+        // whether udf type
+        boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+                && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+
+        if (udfTypeFlag){
+            String[] udfFunIds = sqlParameters.getUdfs().split(",");
+            int[] udfFunIdsArray = new int[udfFunIds.length];
+            for(int i = 0 ; i < udfFunIds.length;i++){
+                udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
+            }
+
+            List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
+            sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+        }
+    }
+
+    /**
      * get execute local path
      *
      * @return execute local path
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index b0fd632..dd7c564 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.apache.dolphinscheduler.common.Constants.*;
@@ -76,7 +76,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
     /**
      * taskUpdateQueue
      */
-    private TaskUpdateQueue taskUpdateQueue;
+    private TaskPriorityQueue taskUpdateQueue;
     /**
      * constructor of MasterBaseTaskExecThread
      * @param taskInstance      task instance
@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         this.cancel = false;
         this.taskInstance = taskInstance;
         this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-        this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
+        this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
     }
 
     /**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
similarity index 97%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
index 48f510e..3ad9aef 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
@@ -17,7 +17,7 @@
 package org.apache.dolphinscheduler.service.queue;
 
 
-public interface TaskUpdateQueue {
+public interface TaskPriorityQueue {
 
     /**
      * put task info
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
similarity index 96%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 1b3bec7..0a0fb1b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -17,8 +17,6 @@
 package org.apache.dolphinscheduler.service.queue;
 
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -31,7 +29,7 @@ import static org.apache.dolphinscheduler.common.Constants.*;
  * tasks queue implementation
  */
 @Service
-public class TaskUpdateQueueImpl implements TaskUpdateQueue {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue {
     /**
      * queue size
      */
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
deleted file mode 100644
index 3ea3195..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * task queue factory
- */
-public class TaskQueueFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(TaskQueueFactory.class);
-
-
-  private TaskQueueFactory(){
-
-  }
-
-
-  /**
-   * get instance (singleton)
-   *
-   * @return instance
-   */
-  public static TaskUpdateQueue getTaskQueueInstance() {
-    String queueImplValue = CommonUtils.getQueueImplValue();
-    if (StringUtils.isNotBlank(queueImplValue)) {
-        logger.info("task queue impl use zookeeper ");
-        return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
-    }else{
-      logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
-      System.exit(-1);
-    }
-
-    return null;
-  }
-}
diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
index a0e4fad..ca6c083 100644
--- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
@@ -17,8 +17,8 @@
 
 package queue;
 
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -45,7 +45,7 @@ public class TaskUpdateQueueTest {
         String taskInfo3 = "1_1_0_3_default";
         String taskInfo4 = "1_1_0_4_default";
 
-        TaskUpdateQueue queue = new TaskUpdateQueueImpl();
+        TaskPriorityQueue queue = new TaskPriorityQueueImpl();
         queue.put(taskInfo1);
         queue.put(taskInfo2);
         queue.put(taskInfo3);
diff --git a/pom.xml b/pom.xml
index e7b95d9..1ba35b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -717,23 +717,18 @@
                         <include>**/dao/mapper/AccessTokenMapperTest.java</include>
                         <include>**/dao/mapper/AlertGroupMapperTest.java</include>
                         <include>**/dao/mapper/AlertMapperTest.java</include>
-                        <include>**/dao/mapper/CommandMapperTest.java</include>
-                        <include>**/dao/cron/CronUtilsTest.java</include>
-                        <include>**/dao/utils/DagHelperTest.java</include>
-                        <include>**/server/worker/task/datax/DataxTaskTest.java</include>
-                        <include>**/server/utils/DataxUtilsTest.java</include>
-                        <include>**/server/utils/SparkArgsUtilsTest.java</include>
-                        <include>**/server/utils/FlinkArgsUtilsTest.java</include>
-                        <include>**/server/utils/ParamUtilsTest.java</include>
+                        <include>**/dao/mapper/DataSourceMapperTest.java</include>
                         <include>**/server/log/MasterLogFilterTest.java</include>
                         <include>**/server/log/SensitiveDataConverterTest.java</include>
                         <include>**/server/log/TaskLogDiscriminatorTest.java</include>
                         <include>**/server/log/TaskLogFilterTest.java</include>
                         <include>**/server/log/WorkerLogFilterTest.java</include>
-                        <include>**/server/master/executor/NettyExecutorManagerTest.java</include>
-                        <include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
-                        <include>**/server/master/host/RandomSelectorTest.java</include>
-                        <include>**/server/master/host/RoundRobinSelectorTest.java</include>
+                        <include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
+                        <include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
+                        <include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
+                        <include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
+                        <include>**/server/master/dispatch/host/RoundRobinHostManagerTest.java</include>
+                        <include>**/server/master/dispatch/ExecutorDispatcherTest.java</include>
                         <include>**/server/master/register/MasterRegistryTest.java</include>
                         <include>**/server/master/AlertManagerTest.java</include>
                         <include>**/server/master/MasterCommandTest.java</include>
@@ -741,19 +736,19 @@
                         <include>**/server/master/ParamsTest.java</include>
                         <include>**/server/register/ZookeeperNodeManagerTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
+                        <include>**/server/utils/ExecutionContextTestUtils.java</include>
                         <include>**/server/utils/FlinkArgsUtilsTest.java</include>
                         <include>**/server/utils/ParamUtilsTest.java</include>
                         <include>**/server/utils/ProcessUtilsTest.java</include>
                         <include>**/server/utils/SparkArgsUtilsTest.java</include>
                         <include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
-                        <include>**/server/worker/register/WorkerRegistryTest.java</include>
+                        <include>**/server/worker/registry/WorkerRegistryTest.java</include>
                         <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
                         <include>**/server/worker/sql/SqlExecutorTest.java</include>
                         <include>**/server/worker/task/datax/DataxTaskTest.java</include>
                         <include>**/server/worker/task/dependent/DependentTaskTest.java</include>
                         <include>**/server/worker/task/spark/SparkTaskTest.java</include>
                         <include>**/server/worker/task/EnvFileTest.java</include>
-
                     </includes>
                     <!-- <skip>true</skip> -->
                     <argLine>-Xmx2048m</argLine>