You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/08/23 06:27:35 UTC
[dolphinscheduler] branch dev updated: [Improvement][TaskInstance] reduce database queries (#11522)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4893bef5a7 [Improvement][TaskInstance] reduce database queries (#11522)
4893bef5a7 is described below
commit 4893bef5a79fc022b74f8c273bd8079517726f35
Author: longtb <67...@users.noreply.github.com>
AuthorDate: Tue Aug 23 14:27:26 2022 +0800
[Improvement][TaskInstance] reduce database queries (#11522)
* [Improvement][TaskInstance] reduce database queries
* Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
Co-authored-by: caishunfeng <ca...@gmail.com>
* [Improvement][TaskInstance] queryByInstanceIdsAndCodes -> queryByProcessInstanceIdsAndTaskCodes
Co-authored-by: zhangshunmin <zh...@kezaihui.com>
Co-authored-by: caishunfeng <ca...@gmail.com>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 34 ++++++++++------
.../service/impl/ProcessInstanceServiceImpl.java | 46 ++++++++++++++--------
.../dao/mapper/TaskInstanceMapper.java | 3 ++
.../dao/mapper/TaskInstanceMapper.xml | 18 +++++++++
.../dao/mapper/TaskInstanceMapperTest.java | 21 ++++++++++
5 files changed, 94 insertions(+), 28 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 1c7b2aba27..9cc12a8f8f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -114,7 +114,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -722,9 +721,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
if (taskRelationList.size() == processTaskRelationLogList.size()) {
- Set<ProcessTaskRelationLog> taskRelationSet = taskRelationList.stream().collect(Collectors.toSet());
- Set<ProcessTaskRelationLog> processTaskRelationLogSet =
- processTaskRelationLogList.stream().collect(Collectors.toSet());
+ Set<ProcessTaskRelationLog> taskRelationSet = new HashSet<>(taskRelationList);
+ Set<ProcessTaskRelationLog> processTaskRelationLogSet = new HashSet<>(processTaskRelationLogList);
if (taskRelationSet.size() == processTaskRelationLogSet.size()) {
taskRelationSet.removeAll(processTaskRelationLogSet);
if (!taskRelationSet.isEmpty()) {
@@ -1047,7 +1045,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Override
@Transactional
public Map<String, Object> importProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result;
String dagDataScheduleJson = FileUtils.file2String(file);
List<DagDataSchedule> dagDataScheduleList = JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class);
Project project = projectMapper.queryByCode(projectCode);
@@ -1658,7 +1656,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
@Override
public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result;
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW);
@@ -1716,9 +1714,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
while (!ServerLifeCycleManager.isStopped()) {
Set<String> postNodeList;
- Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, List<TreeViewDto>> en = iter.next();
+ Set<Map.Entry<String, List<TreeViewDto>>> entries = runningNodeMap.entrySet();
+ List<Integer> processInstanceIds = processInstanceList.stream()
+ .limit(limit).map(ProcessInstance::getId).collect(Collectors.toList());
+ List<Long> nodeCodes = entries.stream().map(e -> Long.parseLong(e.getKey())).collect(Collectors.toList());
+ List<TaskInstance> taskInstances;
+ if (processInstanceIds.isEmpty() || nodeCodes.isEmpty()) {
+ taskInstances = Collections.emptyList();
+ } else {
+ taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(processInstanceIds, nodeCodes);
+ }
+ for (Map.Entry<String, List<TreeViewDto>> en : entries) {
String nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
@@ -1730,8 +1736,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i);
- TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
- Long.parseLong(nodeCode));
+ TaskInstance taskInstance = null;
+ for (TaskInstance instance : taskInstances) {
+ if (instance.getTaskCode() == Long.parseLong(nodeCode)
+ && instance.getProcessInstanceId() == processInstance.getId()) {
+ taskInstance = instance;
+ break;
+ }
+ }
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 833a734377..67ff0edb9b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -780,24 +780,36 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ganttDto.setTaskNames(nodeList);
List<Task> taskList = new ArrayList<>();
- for (String node : nodeList) {
- TaskInstance taskInstance =
- taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node));
- if (taskInstance == null) {
- continue;
+ if (!nodeList.isEmpty()) {
+ List<Long> taskCodes = nodeList.stream().map(Long::parseLong).collect(Collectors.toList());
+ List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
+ Collections.singletonList(processInstanceId), taskCodes
+ );
+ for (String node : nodeList) {
+ TaskInstance taskInstance = null;
+ for (TaskInstance instance : taskInstances) {
+ if (instance.getProcessInstanceId() == processInstanceId
+ && instance.getTaskCode() == Long.parseLong(node)) {
+ taskInstance = instance;
+ break;
+ }
+ }
+ if (taskInstance == null) {
+ continue;
+ }
+ Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
+ Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
+ Task task = new Task();
+ task.setTaskName(taskInstance.getName());
+ task.getStartDate().add(startTime.getTime());
+ task.getEndDate().add(endTime.getTime());
+ task.setIsoStart(startTime);
+ task.setIsoEnd(endTime);
+ task.setStatus(taskInstance.getState().toString());
+ task.setExecutionDate(taskInstance.getStartTime());
+ task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
+ taskList.add(task);
}
- Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
- Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
- Task task = new Task();
- task.setTaskName(taskInstance.getName());
- task.getStartDate().add(startTime.getTime());
- task.getEndDate().add(endTime.getTime());
- task.setIsoStart(startTime);
- task.setIsoEnd(endTime);
- task.setStatus(taskInstance.getState().toString());
- task.setExecutionDate(taskInstance.getStartTime());
- task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
- taskList.add(task);
}
ganttDto.setTasks(taskList);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index ca68dc57e5..2ab78f5151 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -53,6 +53,9 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);
+ List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
+ @Param("taskCodes") List<Long> taskCodes);
+
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index e09b8aa887..0ba80db75f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -134,6 +134,24 @@
and flag = 1
limit 1
</select>
+ <select id="queryByProcessInstanceIdsAndTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_instance
+ where flag = 1
+ <if test="processInstanceIds != null and processInstanceIds.size() != 0">
+ and process_instance_id in
+ <foreach collection="processInstanceIds" index="index" item="i" open="(" separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ <if test="taskCodes != null and taskCodes.size() != 0">
+ and task_code in
+ <foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ </select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index fa5a256caa..014a2a2c3c 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -278,6 +279,26 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Assert.assertNotEquals(taskInstance, null);
}
+ /**
+ * test query by process instance ids and task codes
+ */
+ @Test
+ public void testQueryByProcessInstanceIdsAndTaskCodes() {
+ // insert ProcessInstance
+ ProcessInstance processInstance = insertProcessInstance();
+
+ // insert taskInstance
+ TaskInstance task = insertTaskInstance(processInstance.getId());
+ task.setHost("111.111.11.11");
+ taskInstanceMapper.updateById(task);
+
+ List<TaskInstance> taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
+ Collections.singletonList(task.getProcessInstanceId()),
+ Collections.singletonList(task.getTaskCode()));
+ taskInstanceMapper.deleteById(task.getId());
+ Assert.assertEquals(taskInstances.size(), 1);
+ }
+
/**
* test count task instance
*/