You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/01/16 14:31:35 UTC

[dolphinscheduler] branch dev updated: [Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 90ea6eb  [Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)
90ea6eb is described below

commit 90ea6ebb6f6e5db0e0e444c32da8ccaa180bd459
Author: 天仇 <53...@qq.com>
AuthorDate: Sun Jan 16 22:30:09 2022 +0800

    [Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)
    
    Optimization for query task instances list when build dag flow
    This closes #7907
---
 .../master/runner/WorkflowExecuteThread.java       | 37 +++++++++-------------
 .../server/master/WorkflowExecuteThreadTest.java   | 13 +++++---
 .../service/process/ProcessService.java            | 15 ++++++++-
 .../service/process/ProcessServiceTest.java        | 18 +++++++++++
 4 files changed, 56 insertions(+), 27 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 911439d..73e655f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -1528,27 +1528,25 @@ public class WorkflowExecuteThread {
     }
 
     /**
-     * get recovery task instance
+     * get recovery task instance list
      *
-     * @param taskId task id
-     * @return recovery task instance
+     * @param taskIdArray task id array
+     * @return recovery task instance list
      */
-    private TaskInstance getRecoveryTaskInstance(String taskId) {
-        if (!StringUtils.isNotEmpty(taskId)) {
-            return null;
+    private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) {
+        if (taskIdArray == null || taskIdArray.length == 0) {
+            return new ArrayList<>();
         }
-        try {
-            Integer intId = Integer.valueOf(taskId);
-            TaskInstance task = processService.findTaskInstanceById(intId);
-            if (task == null) {
-                logger.error("start node id cannot be found: {}", taskId);
-            } else {
-                return task;
+        List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
+        for (String taskId : taskIdArray) {
+            try {
+                Integer id = Integer.valueOf(taskId);
+                taskIdList.add(id);
+            } catch (Exception e) {
+                logger.error("get recovery task instance failed ", e);
             }
-        } catch (Exception e) {
-            logger.error("get recovery task instance failed ", e);
         }
-        return null;
+        return processService.findTaskInstanceByIdList(taskIdList);
     }
 
     /**
@@ -1564,12 +1562,7 @@ public class WorkflowExecuteThread {
 
         if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
             String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
-            for (String nodeId : idList) {
-                TaskInstance task = getRecoveryTaskInstance(nodeId);
-                if (task != null) {
-                    instanceList.add(task);
-                }
-            }
+            instanceList = getRecoverTaskInstanceList(idList);
         }
         return instanceList;
     }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 1e38938..3d18498 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.text.ParseException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -154,15 +155,19 @@ public class WorkflowExecuteThreadTest {
             taskInstance4.setId(4);
             Map<String, String> cmdParam = new HashMap<>();
             cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
-            Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance1);
-            Mockito.when(processService.findTaskInstanceById(2)).thenReturn(taskInstance2);
-            Mockito.when(processService.findTaskInstanceById(3)).thenReturn(taskInstance3);
-            Mockito.when(processService.findTaskInstanceById(4)).thenReturn(taskInstance4);
+            Mockito.when(processService.findTaskInstanceByIdList(
+                Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
+            ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
             Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
             Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
             method.setAccessible(true);
             List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
             Assert.assertEquals(4, taskInstances.size());
+
+            cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "");
+            List<TaskInstance> taskInstanceEmpty = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
+            Assert.assertTrue(taskInstanceEmpty.isEmpty());
+
         } catch (Exception e) {
             Assert.fail();
         }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a531470..5cebcf7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1549,13 +1549,26 @@ public class ProcessService {
      * find task instance by id
      *
      * @param taskId task id
-     * @return task intance
+     * @return task instance
      */
     public TaskInstance findTaskInstanceById(Integer taskId) {
         return taskInstanceMapper.selectById(taskId);
     }
 
     /**
+     * find task instance list by id list
+     *
+     * @param idList task id list
+     * @return task instance list
+     */
+    public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
+        if (CollectionUtils.isEmpty(idList)) {
+            return new ArrayList<>();
+        }
+        return taskInstanceMapper.selectBatchIds(idList);
+    }
+
+    /**
      * package task instance
      */
     public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 76b5bad..c7aab24 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -70,6 +70,7 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -790,6 +791,23 @@ public class ProcessServiceTest {
 
     }
 
+    @Test
+    public void testFindTaskInstanceByIdList() {
+        List<Integer> emptyList = new ArrayList<>();
+        Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>());
+        Assert.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size());
+
+        List<Integer> idList = Collections.singletonList(1);
+        TaskInstance instance = new TaskInstance();
+        instance.setId(1);
+
+        Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance));
+        List<TaskInstance> taskInstanceByIdList = processService.findTaskInstanceByIdList(idList);
+
+        Assert.assertEquals(1, taskInstanceByIdList.size());
+        Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
+    }
+
     private TaskGroupQueue getTaskGroupQueue() {
         TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
         taskGroupQueue.setTaskName("task name");