You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/08/02 11:09:48 UTC
[dolphinscheduler] 10/11: Fix recovery from failed task will dead loop (#11239)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 072672a12771b77d69e3bd2247395ad6fca6d3f6
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Aug 2 09:30:18 2022 +0800
Fix recovery from failed task will dead loop (#11239)
(cherry picked from commit 04f3aa97135d79469daf7c21c935029faff827b2)
---
.../server/master/runner/WorkflowExecuteRunnable.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9413707dcd..005a41d26e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -82,7 +82,6 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@@ -1820,12 +1819,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam
if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
- String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
- if (ArrayUtils.isNotEmpty(idList)) {
- List<Integer> taskInstanceIds = Arrays.stream(idList)
- .map(Integer::valueOf)
- .collect(Collectors.toList());
- return processService.findTaskInstanceByIdList(taskInstanceIds);
+ List<Integer> startTaskInstanceIds = Arrays.stream(paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING)
+ .split(COMMA))
+ .filter(StringUtils::isNotEmpty)
+ .map(Integer::valueOf)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) {
+ return processService.findTaskInstanceByIdList(startTaskInstanceIds);
}
}
return Collections.emptyList();