You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2020/12/11 07:35:47 UTC
[incubator-dolphinscheduler] branch dev updated:
[Feature-4138][Master] dispatch workgroup error add sleep time (#4139)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b3120a7 [Feature-4138][Master] dispatch workgroup error add sleep time (#4139)
b3120a7 is described below
commit b3120a74d2656f7ad2054ba8245262551063b549
Author: BoYiZhang <39...@users.noreply.github.com>
AuthorDate: Fri Dec 11 15:35:40 2020 +0800
[Feature-4138][Master] dispatch workgroup error add sleep time (#4139)
* When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second
* When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second
* fix code smell & code style
* fix code smell & code style
Co-authored-by: zhanglong <zh...@ysstech.com>
---
.../master/consumer/TaskPriorityQueueConsumer.java | 46 ++-
.../consumer/TaskPriorityQueueConsumerTest.java | 459 +++++++++++++++++++--
2 files changed, 456 insertions(+), 49 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 822e493..89d3e97 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -136,9 +137,17 @@ public class TaskPriorityQueueConsumer extends Thread {
failedDispatchTasks.add(taskPriorityInfo);
}
}
- for (String dispatchFailedTask : failedDispatchTasks) {
- taskPriorityQueue.put(dispatchFailedTask);
+ if (!failedDispatchTasks.isEmpty()) {
+ for (String dispatchFailedTask : failedDispatchTasks) {
+ taskPriorityQueue.put(dispatchFailedTask);
+ }
+ // If there are tasks in a cycle that cannot find the worker group,
+ // sleep for 1 second
+ if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
+ TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+ }
}
+
} catch (Exception e) {
logger.error("dispatcher task error", e);
}
@@ -151,7 +160,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* @param taskInstanceId taskInstanceId
* @return result
*/
- private boolean dispatch(int taskInstanceId) {
+ protected boolean dispatch(int taskInstanceId) {
boolean result = false;
try {
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
@@ -224,7 +233,6 @@ public class TaskPriorityQueueConsumer extends Thread {
// SQL task
if (taskType == TaskType.SQL) {
setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
-
}
// DATAX task
@@ -271,22 +279,22 @@ public class TaskPriorityQueueConsumer extends Thread {
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskNode taskNode
*/
- private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
+ protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
- DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
- DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+ DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource());
+ DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
- if (dataSource != null) {
+ if (dbSource != null) {
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
- dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
- dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+ dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
+ dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
}
- if (dataTarget != null) {
+ if (dbTarget != null) {
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
- dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
- dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+ dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
+ dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
}
}
@@ -374,7 +382,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* @param taskInstance taskInstance
* @return result
*/
- private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+ protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if (tenant == null) {
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
@@ -387,8 +395,8 @@ public class TaskPriorityQueueConsumer extends Thread {
/**
* get resource map key is full name and value is tenantCode
*/
- private Map<String, String> getResourceFullNames(TaskNode taskNode) {
- Map<String, String> resourceMap = new HashMap<>();
+ protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
+ Map<String, String> resourcesMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
@@ -400,7 +408,7 @@ public class TaskPriorityQueueConsumer extends Thread {
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
oldVersionResources.forEach(
- (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
+ (t) -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
);
}
@@ -413,12 +421,12 @@ public class TaskPriorityQueueConsumer extends Thread {
List<Resource> resources = processService.listResourceByIds(resourceIds);
resources.forEach(
- (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
+ (t) -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
);
}
}
}
- return resourceMap;
+ return resourcesMap;
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index a9c6985..049e30e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -17,18 +17,22 @@
package org.apache.dolphinscheduler.server.master.consumer;
-import java.util.Date;
-
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -42,7 +46,15 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,9 +63,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
+@ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class,
CuratorZookeeperClient.class})
@@ -73,7 +84,7 @@ public class TaskPriorityQueueConsumerTest {
private ExecutorDispatcher dispatcher;
@Before
- public void init(){
+ public void init() {
Tenant tenant = new Tenant();
tenant.setId(1);
@@ -83,12 +94,11 @@ public class TaskPriorityQueueConsumerTest {
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
- Mockito.doReturn(tenant).when(processService).getTenantForProcess(1,2);
+ Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2);
Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
}
-
@Test
public void testSHELLTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@@ -97,12 +107,31 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou [...]
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,"
+ + "\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
@@ -113,11 +142,14 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+ Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
taskPriorityQueue.put("2_1_2_1_default");
- Thread.sleep(10000);
- }
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
+ }
@Test
public void testSQLTask() throws Exception {
@@ -127,7 +159,13 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"82519315 [...]
+ taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
+ + "\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
+ + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],"
+ + "\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
+ + "\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -149,16 +187,20 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setName("sqlDatasource");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
- dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+ dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ + "\"user\":\"root\","
+ + "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
- Thread.sleep(10000);
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
}
-
@Test
public void testDataxTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@@ -167,7 +209,26 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\ [...]
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ + "\"forbidden\":false,\"id\":\"tasks-97625\","
+ + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
+ + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
+ + " \\\"postStatements\\\":[],"
+ + " \\\"jobSpeedRecord\\\":1000,"
+ + " \\\"customConfig\\\":0,"
+ + " \\\"dtType\\\":\\\"MYSQL\\\","
+ + " \\\"dsType\\\":\\\"MYSQL\\\","
+ + " \\\"jobSpeedByte\\\":0,"
+ + " \\\"dataSource\\\":80,"
+ + " \\\"dataTarget\\\":80,"
+ + " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
+ + " \\\"preStatements\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"DATAX\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -184,21 +245,23 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
-
-
DataSource dataSource = new DataSource();
dataSource.setId(80);
dataSource.setName("datax");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
- dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+ dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ + "\"user\":\"root\","
+ + "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.doReturn(dataSource).when(processService).findDataSourceById(80);
- Thread.sleep(10000);
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
}
-
@Test
public void testSqoopTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@@ -207,7 +270,32 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\ [...]
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ + "\"forbidden\":false,\"id\":\"tasks-63634\","
+ + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
+ + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
+ + " \\\"targetType\\\":\\\"HDFS\\\","
+ + " \\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
+ + " \\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
+ + " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
+ + " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
+ + " \\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
+ + " \\\"modelType\\\":\\\"import\\\","
+ + " \\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
+ + " \\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
+ + " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
+ + " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
+ + " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
+ + " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
+ + " \\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
+ + " \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SQOOP\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@@ -224,37 +312,350 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
-
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setName("datax");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
- dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
+ dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ + "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ + "\"user\":\"root\","
+ + "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
- Thread.sleep(10000);
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotNull(taskInstance);
}
-
@Test
- public void testTaskInstanceIsFinalState(){
+ public void testTaskInstanceIsFinalState() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
- taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeou [...]
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ + "\"forbidden\":false,\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
+ Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+ Assert.assertNotNull(state);
+ }
+
+ @Test
+ public void testNotFoundWorkerGroup() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+ Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+ Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+ TimeUnit.SECONDS.sleep(10);
+
+ Assert.assertNotNull(taskInstance);
+
+ }
+
+ @Test
+ public void testDispatch() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+ Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+ Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ boolean res = taskPriorityQueueConsumer.dispatch(1);
+
+ Assert.assertFalse(res);
+ }
+
+ @Test
+ public void testGetTaskExecutionContext() throws Exception {
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+ Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
+ Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
+
+ TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1);
+
+ Assert.assertNotNull(taskExecutionContext);
+ }
+
+ @Test
+ public void testGetResourceFullNames() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+ // task node
+ TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
+
+ Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskNode);
+
+ List<Resource> resourcesList = new ArrayList<Resource>();
+ Resource resource = new Resource();
+ resource.setFileName("fileName");
+ resourcesList.add(resource);
+
+ Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123});
+ Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE);
+ Assert.assertNotNull(map);
+
+ }
+
+ @Test
+ public void testVerifyTenantIsNull() throws Exception {
+ Tenant tenant = null;
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ taskInstance.setProcessInstance(processInstance);
+
+ boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance);
+ Assert.assertTrue(res);
+
+ tenant = new Tenant();
+ tenant.setId(1);
+ tenant.setTenantCode("journey");
+ tenant.setDescription("journey");
+ tenant.setQueueId(1);
+ tenant.setCreateTime(new Date());
+ tenant.setUpdateTime(new Date());
+ res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance);
+ Assert.assertFalse(res);
+
+ }
+
+ @Test
+ public void testSetDataxTaskRelation() throws Exception {
+
+ DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
+ TaskNode taskNode = new TaskNode();
+ taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
+ DataSource dataSource = new DataSource();
+ dataSource.setId(1);
+ dataSource.setConnectionParams("");
+ dataSource.setType(DbType.MYSQL);
+ Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
+
+ taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
+
+ Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
+ Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setTaskType("SHELL");
+ taskInstance.setProcessDefinitionId(1);
+ taskInstance.setProcessInstanceId(1);
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ + "\"conditionsTask\":false,"
+ + "\"depList\":[],"
+ + "\"dependence\":\"{}\","
+ + "\"forbidden\":false,"
+ + "\"id\":\"tasks-55201\","
+ + "\"maxRetryTimes\":0,"
+ + "\"name\":\"测试任务\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ + "\"preTasks\":\"[]\","
+ + "\"retryInterval\":1,"
+ + "\"runFlag\":\"NORMAL\","
+ + "\"taskInstancePriority\":\"MEDIUM\","
+ + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ + "\"timeout\":\"{\\\"enable\\\":false,"
+ + "\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"SHELL\","
+ + "\"workerGroup\":\"NoWorkGroup\"}");
+ taskInstance.setProcessInstancePriority(Priority.MEDIUM);
+ taskInstance.setWorkerGroup("NoWorkGroup");
+ taskInstance.setExecutorId(2);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setTenantId(1);
+ processInstance.setCommandType(CommandType.START_PROCESS);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setUserId(2);
+ processDefinition.setProjectId(1);
+ taskInstance.setProcessDefine(processDefinition);
+
+ Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
- taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
+ taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+
+ taskPriorityQueueConsumer.run();
+
+ TimeUnit.SECONDS.sleep(10);
+ Assert.assertNotEquals(-1,taskPriorityQueue.size());
+
}
@After
@@ -262,6 +663,4 @@ public class TaskPriorityQueueConsumerTest {
Stopper.stop();
}
-
-
}