You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2020/12/11 07:35:47 UTC

[incubator-dolphinscheduler] branch dev updated: [Feature-4138][Master] dispatch workgroup error add sleep time (#4139)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b3120a7  [Feature-4138][Master] dispatch workgroup error add sleep time (#4139)
b3120a7 is described below

commit b3120a74d2656f7ad2054ba8245262551063b549
Author: BoYiZhang <39...@users.noreply.github.com>
AuthorDate: Fri Dec 11 15:35:40 2020 +0800

    [Feature-4138][Master] dispatch workgroup error add sleep time (#4139)
    
    * When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second
    
    * When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second
    
    * fix code smell & code style
    
    * fix code smell & code style
    
    Co-authored-by: zhanglong <zh...@ysstech.com>
---
 .../master/consumer/TaskPriorityQueueConsumer.java |  46 ++-
 .../consumer/TaskPriorityQueueConsumerTest.java    | 459 +++++++++++++++++++--
 2 files changed, 456 insertions(+), 49 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 822e493..89d3e97 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -136,9 +137,17 @@ public class TaskPriorityQueueConsumer extends Thread {
                         failedDispatchTasks.add(taskPriorityInfo);
                     }
                 }
-                for (String dispatchFailedTask : failedDispatchTasks) {
-                    taskPriorityQueue.put(dispatchFailedTask);
+                if (!failedDispatchTasks.isEmpty()) {
+                    for (String dispatchFailedTask : failedDispatchTasks) {
+                        taskPriorityQueue.put(dispatchFailedTask);
+                    }
+                    // If there are tasks in a cycle that cannot find the worker group,
+                    // sleep for 1 second
+                    if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
+                        TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                    }
                 }
+
             } catch (Exception e) {
                 logger.error("dispatcher task error", e);
             }
@@ -151,7 +160,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * @param taskInstanceId taskInstanceId
      * @return result
      */
-    private boolean dispatch(int taskInstanceId) {
+    protected boolean dispatch(int taskInstanceId) {
         boolean result = false;
         try {
             TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
@@ -224,7 +233,6 @@ public class TaskPriorityQueueConsumer extends Thread {
         // SQL task
         if (taskType == TaskType.SQL) {
             setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
-
         }
 
         // DATAX task
@@ -271,22 +279,22 @@ public class TaskPriorityQueueConsumer extends Thread {
      * @param dataxTaskExecutionContext dataxTaskExecutionContext
      * @param taskNode                  taskNode
      */
-    private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
+    protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
         DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
 
-        DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
-        DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+        DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource());
+        DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
 
-        if (dataSource != null) {
+        if (dbSource != null) {
             dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
-            dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-            dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+            dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
+            dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
         }
 
-        if (dataTarget != null) {
+        if (dbTarget != null) {
             dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
-            dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-            dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
+            dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
         }
     }
 
@@ -374,7 +382,7 @@ public class TaskPriorityQueueConsumer extends Thread {
      * @param taskInstance taskInstance
      * @return result
      */
-    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+    protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
         if (tenant == null) {
             logger.error("tenant not exists,process instance id : {},task instance id : {}",
                 taskInstance.getProcessInstance().getId(),
@@ -387,8 +395,8 @@ public class TaskPriorityQueueConsumer extends Thread {
     /**
      * get resource map key is full name and value is tenantCode
      */
-    private Map<String, String> getResourceFullNames(TaskNode taskNode) {
-        Map<String, String> resourceMap = new HashMap<>();
+    protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+        Map<String, String> resourcesMap = new HashMap<>();
         AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
 
         if (baseParam != null) {
@@ -400,7 +408,7 @@ public class TaskPriorityQueueConsumer extends Thread {
                 if (CollectionUtils.isNotEmpty(oldVersionResources)) {
 
                     oldVersionResources.forEach(
-                        (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
+                        (t) -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
                     );
                 }
 
@@ -413,12 +421,12 @@ public class TaskPriorityQueueConsumer extends Thread {
 
                     List<Resource> resources = processService.listResourceByIds(resourceIds);
                     resources.forEach(
-                        (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
+                        (t) -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
                     );
                 }
             }
         }
 
-        return resourceMap;
+        return resourcesMap;
     }
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index a9c6985..049e30e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -17,18 +17,22 @@
 
 package org.apache.dolphinscheduler.server.master.consumer;
 
-import java.util.Date;
-
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -42,7 +46,15 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,9 +63,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
+@ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
         NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
         ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class,
         CuratorZookeeperClient.class})
@@ -73,7 +84,7 @@ public class TaskPriorityQueueConsumerTest {
     private ExecutorDispatcher dispatcher;
 
     @Before
-    public void init(){
+    public void init() {
 
         Tenant tenant = new Tenant();
         tenant.setId(1);
@@ -83,12 +94,11 @@ public class TaskPriorityQueueConsumerTest {
         tenant.setCreateTime(new Date());
         tenant.setUpdateTime(new Date());
 
-        Mockito.doReturn(tenant).when(processService).getTenantForProcess(1,2);
+        Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2);
 
         Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
     }
 
-
     @Test
     public void testSHELLTask() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
@@ -97,12 +107,31 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou [...]
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,"
+                + "\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
 
         ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
         processInstance.setTenantId(1);
         processInstance.setCommandType(CommandType.START_PROCESS);
         taskInstance.setProcessInstance(processInstance);
@@ -113,11 +142,14 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefine(processDefinition);
 
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
         taskPriorityQueue.put("2_1_2_1_default");
 
-        Thread.sleep(10000);
-    }
+        TimeUnit.SECONDS.sleep(10);
 
+        Assert.assertNotNull(taskInstance);
+    }
 
     @Test
     public void testSQLTask() throws Exception {
@@ -127,7 +159,13 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"82519315 [...]
+        taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
+                + "\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
+                + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],"
+                + "\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
+                + "\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -149,16 +187,20 @@ public class TaskPriorityQueueConsumerTest {
         dataSource.setName("sqlDatasource");
         dataSource.setType(DbType.MYSQL);
         dataSource.setUserId(2);
-        dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+        dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+                + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+                + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+                + "\"user\":\"root\","
+                + "\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
 
         Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
 
-        Thread.sleep(10000);
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotNull(taskInstance);
     }
 
-
     @Test
     public void testDataxTask() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
@@ -167,7 +209,26 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\ [...]
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+                + "\"forbidden\":false,\"id\":\"tasks-97625\","
+                + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
+                + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
+                + "    \\\"postStatements\\\":[],"
+                + "    \\\"jobSpeedRecord\\\":1000,"
+                + "    \\\"customConfig\\\":0,"
+                + "    \\\"dtType\\\":\\\"MYSQL\\\","
+                + "    \\\"dsType\\\":\\\"MYSQL\\\","
+                + "    \\\"jobSpeedByte\\\":0,"
+                + "    \\\"dataSource\\\":80,"
+                + "    \\\"dataTarget\\\":80,"
+                + "    \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
+                + "    \\\"preStatements\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"DATAX\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -184,21 +245,23 @@ public class TaskPriorityQueueConsumerTest {
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         taskPriorityQueue.put("2_1_2_1_default");
 
-
-
         DataSource dataSource = new DataSource();
         dataSource.setId(80);
         dataSource.setName("datax");
         dataSource.setType(DbType.MYSQL);
         dataSource.setUserId(2);
-        dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+        dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+                + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+                + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+                + "\"user\":\"root\","
+                + "\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
         Mockito.doReturn(dataSource).when(processService).findDataSourceById(80);
-        Thread.sleep(10000);
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotNull(taskInstance);
     }
 
-
     @Test
     public void testSqoopTask() throws Exception {
         TaskInstance taskInstance = new TaskInstance();
@@ -207,7 +270,32 @@ public class TaskPriorityQueueConsumerTest {
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\ [...]
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+                + "\"forbidden\":false,\"id\":\"tasks-63634\","
+                + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
+                + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
+                + "    \\\"targetType\\\":\\\"HDFS\\\","
+                + "    \\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
+                + "        \\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
+                + "        \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
+                + "        \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
+                + "        \\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
+                + "    \\\"modelType\\\":\\\"import\\\","
+                + "    \\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
+                + "        \\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
+                + "        \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
+                + "        \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
+                + "        \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
+                + "        \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
+                + "        \\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
+                + "    \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SQOOP\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
@@ -224,37 +312,350 @@ public class TaskPriorityQueueConsumerTest {
         Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         taskPriorityQueue.put("2_1_2_1_default");
 
-
         DataSource dataSource = new DataSource();
         dataSource.setId(1);
         dataSource.setName("datax");
         dataSource.setType(DbType.MYSQL);
         dataSource.setUserId(2);
-        dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+        dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+                + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+                + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+                + "\"user\":\"root\","
+                + "\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
         Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
-        Thread.sleep(10000);
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotNull(taskInstance);
     }
 
-
     @Test
-    public void testTaskInstanceIsFinalState(){
+    public void testTaskInstanceIsFinalState() {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1);
         taskInstance.setTaskType("SHELL");
         taskInstance.setProcessDefinitionId(1);
         taskInstance.setProcessInstanceId(1);
         taskInstance.setState(ExecutionStatus.KILL);
-        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou [...]
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+                + "\"forbidden\":false,\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"default\"}");
         taskInstance.setProcessInstancePriority(Priority.MEDIUM);
         taskInstance.setWorkerGroup("default");
         taskInstance.setExecutorId(2);
 
+        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+        Assert.assertNotNull(state);
+    }
+
+    @Test
+    public void testNotFoundWorkerGroup() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+        TimeUnit.SECONDS.sleep(10);
+
+        Assert.assertNotNull(taskInstance);
+
+    }
+
+    @Test
+    public void testDispatch() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        boolean res  = taskPriorityQueueConsumer.dispatch(1);
+
+        Assert.assertFalse(res);
+    }
+
+    @Test
+    public void testGetTaskExecutionContext() throws Exception {
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+        Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+        TaskExecutionContext taskExecutionContext  = taskPriorityQueueConsumer.getTaskExecutionContext(1);
+
+        Assert.assertNotNull(taskExecutionContext);
+    }
+
+    @Test
+    public void testGetResourceFullNames() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+        // task node
+        TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
+
+        Map<String, String>  map = taskPriorityQueueConsumer.getResourceFullNames(taskNode);
+
+        List<Resource> resourcesList = new ArrayList<Resource>();
+        Resource resource = new Resource();
+        resource.setFileName("fileName");
+        resourcesList.add(resource);
+
+        Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123});
+        Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE);
+        Assert.assertNotNull(map);
+
+    }
+
+    @Test
+    public void testVerifyTenantIsNull() throws Exception {
+        Tenant tenant = null;
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
 
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        taskInstance.setProcessInstance(processInstance);
+
+        boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance);
+        Assert.assertTrue(res);
+
+        tenant = new Tenant();
+        tenant.setId(1);
+        tenant.setTenantCode("journey");
+        tenant.setDescription("journey");
+        tenant.setQueueId(1);
+        tenant.setCreateTime(new Date());
+        tenant.setUpdateTime(new Date());
+        res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance);
+        Assert.assertFalse(res);
+
+    }
+
+    @Test
+    public void testSetDataxTaskRelation() throws Exception {
+
+        DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
+        TaskNode taskNode = new TaskNode();
+        taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
+        DataSource dataSource = new DataSource();
+        dataSource.setId(1);
+        dataSource.setConnectionParams("");
+        dataSource.setType(DbType.MYSQL);
+        Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
+
+        taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
+
+        Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
+        Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
+    }
+
+    @Test
+    public void testRun() throws Exception {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setTaskType("SHELL");
+        taskInstance.setProcessDefinitionId(1);
+        taskInstance.setProcessInstanceId(1);
+        taskInstance.setState(ExecutionStatus.KILL);
+        taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+                + "\"conditionsTask\":false,"
+                + "\"depList\":[],"
+                + "\"dependence\":\"{}\","
+                + "\"forbidden\":false,"
+                + "\"id\":\"tasks-55201\","
+                + "\"maxRetryTimes\":0,"
+                + "\"name\":\"测试任务\","
+                + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+                + "\"preTasks\":\"[]\","
+                + "\"retryInterval\":1,"
+                + "\"runFlag\":\"NORMAL\","
+                + "\"taskInstancePriority\":\"MEDIUM\","
+                + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+                + "\"timeout\":\"{\\\"enable\\\":false,"
+                + "\\\"strategy\\\":\\\"\\\"}\","
+                + "\"type\":\"SHELL\","
+                + "\"workerGroup\":\"NoWorkGroup\"}");
+        taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+        taskInstance.setWorkerGroup("NoWorkGroup");
+        taskInstance.setExecutorId(2);
+
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setTenantId(1);
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        taskInstance.setProcessInstance(processInstance);
+        taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setUserId(2);
+        processDefinition.setProjectId(1);
+        taskInstance.setProcessDefine(processDefinition);
+
+        Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
         Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
 
-        taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+        taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+        taskPriorityQueueConsumer.run();
+
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertNotEquals(-1,taskPriorityQueue.size());
+
     }
 
     @After
@@ -262,6 +663,4 @@ public class TaskPriorityQueueConsumerTest {
         Stopper.stop();
     }
 
-
-
 }