You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/13 14:32:35 UTC
[dolphinscheduler] branch dev updated: Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b245e7c973 Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)
b245e7c973 is described below
commit b245e7c973a5282a1405097605a4afacac1b5eab
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 13 22:32:30 2022 +0800
Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)
---
.../master/service/MasterFailoverService.java | 21 +++++++++++++++------
.../master/service/WorkerFailoverService.java | 9 +++++----
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index c7e5b4ea13..61ba7c3fd6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -114,16 +115,18 @@ public class MasterFailoverService {
* @param masterHost master host
*/
private void doFailoverMaster(@NonNull String masterHost) {
- LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
StopWatch failoverTimeCost = StopWatch.createStarted();
- Optional<Date> masterStartupTimeOptional =
- getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
- List<ProcessInstance> needFailoverProcessInstanceList =
- processService.queryNeedFailoverProcessInstances(masterHost);
+ Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
+ masterHost);
+ List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
+ masterHost);
+ if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
+ return;
+ }
LOGGER.info(
- "Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
+ "Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
masterHost,
needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
@@ -136,6 +139,11 @@ public class MasterFailoverService {
LOGGER.info("WorkflowInstance doesn't need to failover");
continue;
}
+ // todo: use batch query
+ ProcessDefinition processDefinition
+ = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
+ processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
for (TaskInstance taskInstance : taskInstanceList) {
@@ -205,6 +213,7 @@ public class MasterFailoverService {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
+ .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index ec126a3ec3..402ec43354 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -115,10 +115,10 @@ public class WorkerFailoverService {
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try {
- ProcessInstance processInstance =
- processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> {
- WorkflowExecuteRunnable workflowExecuteRunnable =
- cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
+ ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
+ taskInstance.getProcessInstanceId(), k -> {
+ WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
+ taskInstance.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
return null;
}
@@ -167,6 +167,7 @@ public class WorkerFailoverService {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
+ .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {