You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/11/03 01:15:26 UTC

[dolphinscheduler] branch dev updated: [Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7cdb926a5f [Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)
7cdb926a5f is described below

commit 7cdb926a5feb6892635e3467f2e9497d99c6776f
Author: ZhenjiLiu <34...@qq.com>
AuthorDate: Thu Nov 3 09:15:19 2022 +0800

    [Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)
---
 .../dao/repository/ProcessDefinitionDao.java       |  9 +++++++
 .../repository/impl/ProcessDefinitionDaoImpl.java  | 31 ++++++++++++++++++++++
 .../master/service/MasterFailoverService.java      | 18 ++++++++++---
 .../server/master/service/FailoverServiceTest.java |  9 +++++++
 4 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
index 0ffa83d2dd..e89adf3367 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
@@ -18,8 +18,11 @@
 package org.apache.dolphinscheduler.dao.repository;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.model.PageListingResult;
 
+import java.util.List;
+
 import javax.annotation.Nullable;
 
 public interface ProcessDefinitionDao {
@@ -37,4 +40,10 @@ public interface ProcessDefinitionDao {
                                                                   int userId,
                                                                   long projectCode);
 
+    /**
+     * query process definitions by definition codes and versions
+     * @param processInstances process instances where codes and version come from
+     * @return
+     */
+    List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
index 561d995251..8a459991ea 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
@@ -18,10 +18,18 @@
 package org.apache.dolphinscheduler.dao.repository.impl;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.model.PageListingResult;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
@@ -33,6 +41,8 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
 
     @Autowired
     private ProcessDefinitionMapper processDefinitionMapper;
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
 
     @Override
     public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal,
@@ -48,4 +58,25 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
                 .records(processDefinitions.getRecords())
                 .build();
     }
+
+    @Override
+    public List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances) {
+        if (Objects.isNull(processInstances) || processInstances.isEmpty()) {
+            return new ArrayList<>();
+        }
+        List<ProcessDefinitionLog> processDefinitionLogs = processInstances
+                .parallelStream()
+                .map(processInstance -> {
+                    ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
+                            .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
+                                    processInstance.getProcessDefinitionVersion());
+                    return processDefinitionLog;
+                })
+                .collect(Collectors.toList());
+
+        List<ProcessDefinition> processDefinitions =
+                processDefinitionLogs.stream().map(log -> (ProcessDefinition) log).collect(Collectors.toList());
+
+        return processDefinitions;
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 5d6048c208..df39c9f461 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -48,14 +49,17 @@ import org.apache.commons.lang3.time.StopWatch;
 
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import lombok.NonNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import io.micrometer.core.annotation.Counted;
@@ -78,6 +82,9 @@ public class MasterFailoverService {
 
     private final TaskInstanceDao taskInstanceDao;
 
+    @Autowired
+    private ProcessDefinitionDao processDefinitionDao;
+
     public MasterFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
                                  @NonNull ProcessService processService,
@@ -153,6 +160,12 @@ public class MasterFailoverService {
                 needFailoverProcessInstanceList.size(),
                 needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
 
+        List<ProcessDefinition> processDefinitions =
+                processDefinitionDao.queryProcessDefinitionsByCodesAndVersions(needFailoverProcessInstanceList);
+        Map<Long, ProcessDefinition> codeDefinitionMap = processDefinitions
+                .stream()
+                .collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
+
         for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
             try {
                 LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
@@ -161,10 +174,7 @@ public class MasterFailoverService {
                     LOGGER.info("WorkflowInstance doesn't need to failover");
                     continue;
                 }
-                // todo: use batch query
-                ProcessDefinition processDefinition =
-                        processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                                processInstance.getProcessDefinitionVersion());
+                ProcessDefinition processDefinition = codeDefinitionMap.get(processInstance.getProcessDefinitionCode());
                 processInstance.setProcessDefinition(processDefinition);
                 int processInstanceId = processInstance.getId();
                 List<TaskInstance> taskInstanceList =
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 2e9dc23936..73b6f5d285 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -42,6 +43,7 @@ import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -56,6 +58,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 import org.springframework.context.ApplicationContext;
+import org.springframework.util.ReflectionUtils;
 
 import com.google.common.collect.Lists;
 
@@ -92,6 +95,9 @@ public class FailoverServiceTest {
     @Mock
     private LogClient logClient;
 
+    @Mock
+    private ProcessDefinitionDao processDefinitionDao;
+
     private static int masterPort = 5678;
     private static int workerPort = 1234;
 
@@ -113,6 +119,9 @@ public class FailoverServiceTest {
         MasterFailoverService masterFailoverService =
                 new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
                         processInstanceExecCacheManager, logClient, taskInstanceDao);
+        Field processDefinitionDaoField = masterFailoverService.getClass().getDeclaredField("processDefinitionDao");
+        processDefinitionDaoField.setAccessible(true);
+        ReflectionUtils.setField(processDefinitionDaoField, masterFailoverService, processDefinitionDao);
         WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
                 masterConfig,
                 processService,