You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/11/03 01:15:26 UTC
[dolphinscheduler] branch dev updated: [Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7cdb926a5f [Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)
7cdb926a5f is described below
commit 7cdb926a5feb6892635e3467f2e9497d99c6776f
Author: ZhenjiLiu <34...@qq.com>
AuthorDate: Thu Nov 3 09:15:19 2022 +0800
[Improvement][Batch Query] Batch query ProcessDefinitions belongs to need failover ProcessInstance. (#12506)
---
.../dao/repository/ProcessDefinitionDao.java | 9 +++++++
.../repository/impl/ProcessDefinitionDaoImpl.java | 31 ++++++++++++++++++++++
.../master/service/MasterFailoverService.java | 18 ++++++++++---
.../server/master/service/FailoverServiceTest.java | 9 +++++++
4 files changed, 63 insertions(+), 4 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
index 0ffa83d2dd..e89adf3367 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
@@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
+import java.util.List;
+
import javax.annotation.Nullable;
public interface ProcessDefinitionDao {
@@ -37,4 +40,10 @@ public interface ProcessDefinitionDao {
int userId,
long projectCode);
+ /**
+ * query process definitions by definition codes and versions
+ * @param processInstances process instances where codes and version come from
+ * @return
+ */
+ List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
index 561d995251..8a459991ea 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
@@ -18,10 +18,18 @@
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@@ -33,6 +41,8 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
+ @Autowired
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override
public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal,
@@ -48,4 +58,25 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
.records(processDefinitions.getRecords())
.build();
}
+
+ @Override
+ public List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances) {
+ if (Objects.isNull(processInstances) || processInstances.isEmpty()) {
+ return new ArrayList<>();
+ }
+ List<ProcessDefinitionLog> processDefinitionLogs = processInstances
+ .parallelStream()
+ .map(processInstance -> {
+ ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
+ .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
+ return processDefinitionLog;
+ })
+ .collect(Collectors.toList());
+
+ List<ProcessDefinition> processDefinitions =
+ processDefinitionLogs.stream().map(log -> (ProcessDefinition) log).collect(Collectors.toList());
+
+ return processDefinitions;
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 5d6048c208..df39c9f461 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -48,14 +49,17 @@ import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.micrometer.core.annotation.Counted;
@@ -78,6 +82,9 @@ public class MasterFailoverService {
private final TaskInstanceDao taskInstanceDao;
+ @Autowired
+ private ProcessDefinitionDao processDefinitionDao;
+
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@@ -153,6 +160,12 @@ public class MasterFailoverService {
needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
+ List<ProcessDefinition> processDefinitions =
+ processDefinitionDao.queryProcessDefinitionsByCodesAndVersions(needFailoverProcessInstanceList);
+ Map<Long, ProcessDefinition> codeDefinitionMap = processDefinitions
+ .stream()
+ .collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
+
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
@@ -161,10 +174,7 @@ public class MasterFailoverService {
LOGGER.info("WorkflowInstance doesn't need to failover");
continue;
}
- // todo: use batch query
- ProcessDefinition processDefinition =
- processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ ProcessDefinition processDefinition = codeDefinitionMap.get(processInstance.getProcessDefinitionCode());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList =
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 2e9dc23936..73b6f5d285 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -42,6 +43,7 @@ import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -56,6 +58,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
+import org.springframework.util.ReflectionUtils;
import com.google.common.collect.Lists;
@@ -92,6 +95,9 @@ public class FailoverServiceTest {
@Mock
private LogClient logClient;
+ @Mock
+ private ProcessDefinitionDao processDefinitionDao;
+
private static int masterPort = 5678;
private static int workerPort = 1234;
@@ -113,6 +119,9 @@ public class FailoverServiceTest {
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
processInstanceExecCacheManager, logClient, taskInstanceDao);
+ Field processDefinitionDaoField = masterFailoverService.getClass().getDeclaredField("processDefinitionDao");
+ processDefinitionDaoField.setAccessible(true);
+ ReflectionUtils.setField(processDefinitionDaoField, masterFailoverService, processDefinitionDao);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,