You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/01/16 14:31:35 UTC
[dolphinscheduler] branch dev updated: [Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 90ea6eb [Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)
90ea6eb is described below
commit 90ea6ebb6f6e5db0e0e444c32da8ccaa180bd459
Author: 天仇 <53...@qq.com>
AuthorDate: Sun Jan 16 22:30:09 2022 +0800
[Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)
Optimization for query task instances list when build dag flow
This closes #7907
---
.../master/runner/WorkflowExecuteThread.java | 37 +++++++++-------------
.../server/master/WorkflowExecuteThreadTest.java | 13 +++++---
.../service/process/ProcessService.java | 15 ++++++++-
.../service/process/ProcessServiceTest.java | 18 +++++++++++
4 files changed, 56 insertions(+), 27 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 911439d..73e655f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -1528,27 +1528,25 @@ public class WorkflowExecuteThread {
}
/**
- * get recovery task instance
+ * get recovery task instance list
*
- * @param taskId task id
- * @return recovery task instance
+ * @param taskIdArray task id array
+ * @return recovery task instance list
*/
- private TaskInstance getRecoveryTaskInstance(String taskId) {
- if (!StringUtils.isNotEmpty(taskId)) {
- return null;
+ private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) {
+ if (taskIdArray == null || taskIdArray.length == 0) {
+ return new ArrayList<>();
}
- try {
- Integer intId = Integer.valueOf(taskId);
- TaskInstance task = processService.findTaskInstanceById(intId);
- if (task == null) {
- logger.error("start node id cannot be found: {}", taskId);
- } else {
- return task;
+ List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
+ for (String taskId : taskIdArray) {
+ try {
+ Integer id = Integer.valueOf(taskId);
+ taskIdList.add(id);
+ } catch (Exception e) {
+ logger.error("get recovery task instance failed ", e);
}
- } catch (Exception e) {
- logger.error("get recovery task instance failed ", e);
}
- return null;
+ return processService.findTaskInstanceByIdList(taskIdList);
}
/**
@@ -1564,12 +1562,7 @@ public class WorkflowExecuteThread {
if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
- for (String nodeId : idList) {
- TaskInstance task = getRecoveryTaskInstance(nodeId);
- if (task != null) {
- instanceList.add(task);
- }
- }
+ instanceList = getRecoverTaskInstanceList(idList);
}
return instanceList;
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 1e38938..3d18498 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -154,15 +155,19 @@ public class WorkflowExecuteThreadTest {
taskInstance4.setId(4);
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance1);
- Mockito.when(processService.findTaskInstanceById(2)).thenReturn(taskInstance2);
- Mockito.when(processService.findTaskInstanceById(3)).thenReturn(taskInstance3);
- Mockito.when(processService.findTaskInstanceById(4)).thenReturn(taskInstance4);
+ Mockito.when(processService.findTaskInstanceByIdList(
+ Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
+ ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
method.setAccessible(true);
List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
Assert.assertEquals(4, taskInstances.size());
+
+ cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "");
+ List<TaskInstance> taskInstanceEmpty = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
+ Assert.assertTrue(taskInstanceEmpty.isEmpty());
+
} catch (Exception e) {
Assert.fail();
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a531470..5cebcf7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1549,13 +1549,26 @@ public class ProcessService {
* find task instance by id
*
* @param taskId task id
- * @return task intance
+ * @return task instance
*/
public TaskInstance findTaskInstanceById(Integer taskId) {
return taskInstanceMapper.selectById(taskId);
}
/**
+ * find task instance list by id list
+ *
+ * @param idList task id list
+ * @return task instance list
+ */
+ public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
+ if (CollectionUtils.isEmpty(idList)) {
+ return new ArrayList<>();
+ }
+ return taskInstanceMapper.selectBatchIds(idList);
+ }
+
+ /**
* package task instance
*/
public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 76b5bad..c7aab24 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -70,6 +70,7 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -790,6 +791,23 @@ public class ProcessServiceTest {
}
+ @Test
+ public void testFindTaskInstanceByIdList() {
+ List<Integer> emptyList = new ArrayList<>();
+ Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>());
+ Assert.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size());
+
+ List<Integer> idList = Collections.singletonList(1);
+ TaskInstance instance = new TaskInstance();
+ instance.setId(1);
+
+ Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance));
+ List<TaskInstance> taskInstanceByIdList = processService.findTaskInstanceByIdList(idList);
+
+ Assert.assertEquals(1, taskInstanceByIdList.size());
+ Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
+ }
+
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task name");