You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/23 06:27:35 UTC

[dolphinscheduler] branch dev updated: [Improvement][TaskInstance] reduce database queries (#11522)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4893bef5a7 [Improvement][TaskInstance] reduce database queries (#11522)
4893bef5a7 is described below

commit 4893bef5a79fc022b74f8c273bd8079517726f35
Author: longtb <67...@users.noreply.github.com>
AuthorDate: Tue Aug 23 14:27:26 2022 +0800

    [Improvement][TaskInstance] reduce database queries (#11522)
    
    * [Improvement][TaskInstance] reduce database queries
    
    * Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
    
    * [Improvement][TaskInstance] queryByInstanceIdsAndCodes -> queryByProcessInstanceIdsAndTaskCodes
    
    Co-authored-by: zhangshunmin <zh...@kezaihui.com>
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 34 ++++++++++------
 .../service/impl/ProcessInstanceServiceImpl.java   | 46 ++++++++++++++--------
 .../dao/mapper/TaskInstanceMapper.java             |  3 ++
 .../dao/mapper/TaskInstanceMapper.xml              | 18 +++++++++
 .../dao/mapper/TaskInstanceMapperTest.java         | 21 ++++++++++
 5 files changed, 94 insertions(+), 28 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 1c7b2aba27..9cc12a8f8f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -114,7 +114,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -722,9 +721,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper
                     .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
             if (taskRelationList.size() == processTaskRelationLogList.size()) {
-                Set<ProcessTaskRelationLog> taskRelationSet = taskRelationList.stream().collect(Collectors.toSet());
-                Set<ProcessTaskRelationLog> processTaskRelationLogSet =
-                        processTaskRelationLogList.stream().collect(Collectors.toSet());
+                Set<ProcessTaskRelationLog> taskRelationSet = new HashSet<>(taskRelationList);
+                Set<ProcessTaskRelationLog> processTaskRelationLogSet = new HashSet<>(processTaskRelationLogList);
                 if (taskRelationSet.size() == processTaskRelationLogSet.size()) {
                     taskRelationSet.removeAll(processTaskRelationLogSet);
                     if (!taskRelationSet.isEmpty()) {
@@ -1047,7 +1045,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     @Transactional
     public Map<String, Object> importProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
-        Map<String, Object> result = new HashMap<>();
+        Map<String, Object> result;
         String dagDataScheduleJson = FileUtils.file2String(file);
         List<DagDataSchedule> dagDataScheduleList = JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class);
         Project project = projectMapper.queryByCode(projectCode);
@@ -1658,7 +1656,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      */
     @Override
     public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) {
-        Map<String, Object> result = new HashMap<>();
+        Map<String, Object> result;
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
         result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW);
@@ -1716,9 +1714,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
         while (!ServerLifeCycleManager.isStopped()) {
             Set<String> postNodeList;
-            Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
-            while (iter.hasNext()) {
-                Map.Entry<String, List<TreeViewDto>> en = iter.next();
+            Set<Map.Entry<String, List<TreeViewDto>>> entries = runningNodeMap.entrySet();
+            List<Integer> processInstanceIds = processInstanceList.stream()
+                    .limit(limit).map(ProcessInstance::getId).collect(Collectors.toList());
+            List<Long> nodeCodes = entries.stream().map(e -> Long.parseLong(e.getKey())).collect(Collectors.toList());
+            List<TaskInstance> taskInstances;
+            if (processInstanceIds.isEmpty() || nodeCodes.isEmpty()) {
+                taskInstances = Collections.emptyList();
+            } else {
+                taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(processInstanceIds, nodeCodes);
+            }
+            for (Map.Entry<String, List<TreeViewDto>> en : entries) {
                 String nodeCode = en.getKey();
                 parentTreeViewDtoList = en.getValue();
 
@@ -1730,8 +1736,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 // set treeViewDto instances
                 for (int i = limit - 1; i >= 0; i--) {
                     ProcessInstance processInstance = processInstanceList.get(i);
-                    TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
-                            Long.parseLong(nodeCode));
+                    TaskInstance taskInstance = null;
+                    for (TaskInstance instance : taskInstances) {
+                        if (instance.getTaskCode() == Long.parseLong(nodeCode)
+                                && instance.getProcessInstanceId() == processInstance.getId()) {
+                            taskInstance = instance;
+                            break;
+                        }
+                    }
                     if (taskInstance == null) {
                         treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
                     } else {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 833a734377..67ff0edb9b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -780,24 +780,36 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         ganttDto.setTaskNames(nodeList);
 
         List<Task> taskList = new ArrayList<>();
-        for (String node : nodeList) {
-            TaskInstance taskInstance =
-                    taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node));
-            if (taskInstance == null) {
-                continue;
+        if (!nodeList.isEmpty()) {
+            List<Long> taskCodes = nodeList.stream().map(Long::parseLong).collect(Collectors.toList());
+            List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
+                    Collections.singletonList(processInstanceId), taskCodes
+            );
+            for (String node : nodeList) {
+                TaskInstance taskInstance = null;
+                for (TaskInstance instance : taskInstances) {
+                    if (instance.getProcessInstanceId() == processInstanceId
+                            && instance.getTaskCode() == Long.parseLong(node)) {
+                        taskInstance = instance;
+                        break;
+                    }
+                }
+                if (taskInstance == null) {
+                    continue;
+                }
+                Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
+                Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
+                Task task = new Task();
+                task.setTaskName(taskInstance.getName());
+                task.getStartDate().add(startTime.getTime());
+                task.getEndDate().add(endTime.getTime());
+                task.setIsoStart(startTime);
+                task.setIsoEnd(endTime);
+                task.setStatus(taskInstance.getState().toString());
+                task.setExecutionDate(taskInstance.getStartTime());
+                task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
+                taskList.add(task);
             }
-            Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
-            Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
-            Task task = new Task();
-            task.setTaskName(taskInstance.getName());
-            task.getStartDate().add(startTime.getTime());
-            task.getEndDate().add(endTime.getTime());
-            task.setIsoStart(startTime);
-            task.setIsoEnd(endTime);
-            task.setStatus(taskInstance.getState().toString());
-            task.setExecutionDate(taskInstance.getStartTime());
-            task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
-            taskList.add(task);
         }
         ganttDto.setTasks(taskList);
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index ca68dc57e5..2ab78f5151 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -53,6 +53,9 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
     TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
                                           @Param("taskCode") Long taskCode);
 
+    List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
+                                                  @Param("taskCodes") List<Long> taskCodes);
+
     Integer countTask(@Param("projectCodes") Long[] projectCodes,
                       @Param("taskIds") int[] taskIds);
 
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index e09b8aa887..0ba80db75f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -134,6 +134,24 @@
         and flag = 1
         limit 1
     </select>
+    <select id="queryByProcessInstanceIdsAndTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_instance
+        where flag = 1
+        <if test="processInstanceIds != null and processInstanceIds.size() != 0">
+            and process_instance_id in
+            <foreach collection="processInstanceIds" index="index" item="i" open="(" separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+        <if test="taskCodes != null and taskCodes.size() != 0">
+            and task_code in
+            <foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
+                #{i}
+            </foreach>
+        </if>
+    </select>
     <select id="countTask" resultType="java.lang.Integer">
         select count(1) as count
         from t_ds_task_instance task,t_ds_task_definition_log define
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index fa5a256caa..014a2a2c3c 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 
@@ -278,6 +279,26 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
         Assert.assertNotEquals(taskInstance, null);
     }
 
+    /**
+     * test query by process instance ids and task codes
+     */
+    @Test
+    public void testQueryByProcessInstanceIdsAndTaskCodes() {
+        // insert ProcessInstance
+        ProcessInstance processInstance = insertProcessInstance();
+
+        // insert taskInstance
+        TaskInstance task = insertTaskInstance(processInstance.getId());
+        task.setHost("111.111.11.11");
+        taskInstanceMapper.updateById(task);
+
+        List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
+                Collections.singletonList(task.getProcessInstanceId()),
+                Collections.singletonList(task.getTaskCode()));
+        taskInstanceMapper.deleteById(task.getId());
+        Assert.assertEquals(taskInstances.size(), 1);
+    }
+
     /**
      * test count task instance
      */