You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/08/05 03:55:18 UTC
[dolphinscheduler] branch 2.0.7-prepare updated: fix dependent task when the dependent on process have forbidden tasks (#10952)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.7-prepare by this push:
new 749a9d2f43 fix dependent task when the dependent on process have forbidden tasks (#10952)
749a9d2f43 is described below
commit 749a9d2f43ef491944ba179d253a857c0ce71246
Author: Kevin.Shin <ma...@163.com>
AuthorDate: Fri Aug 5 11:55:12 2022 +0800
fix dependent task when the dependent on process have forbidden tasks (#10952)
* fix dependent task when the dependent on process have forbidden tasks
* fix dependent task when the dependent on process have forbidden tasks
Co-authored-by: shenk-b <sh...@glodon.com>
---
.../server/utils/DependentExecute.java | 25 ++++++++++++----------
1 file changed, 14 insertions(+), 11 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index a26e8c6e29..627d481aca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
@@ -145,6 +146,7 @@ public class DependentExecute {
Map<Long, String> definiteTask = taskDefinitionLogs.stream().filter(log -> !log.getTaskType().equals(TaskType.SUB_PROCESS.getDesc())
|| !log.getTaskType().equals(TaskType.DEPENDENT.getDesc())
|| !log.getTaskType().equals(TaskType.CONDITIONS.getDesc()))
+ .filter(log -> log.getFlag().equals(Flag.YES))
.collect(Collectors.toMap(TaskDefinition::getCode, TaskDefinitionLog::getName));
if (!definiteTask.isEmpty()) {
List<TaskInstance> taskInstanceList = processService.findLastTaskInstanceListInterval(definiteTask.keySet(), dateInterval);
@@ -194,25 +196,26 @@ public class DependentExecute {
*/
private DependResult getDependTaskResult(ProcessInstance processInstance, long taskCode, DateInterval dateInterval) {
TaskInstance taskInstance = processService.findLastTaskInstanceInterval(taskCode, dateInterval);
- DependResult result;
if (taskInstance == null) {
- if (!processInstance.getState().typeIsFinished()) {
- logger.info("Wait for the dependent workflow to complete, taskCode:{}, processInstanceId:{}, processInstance state:{}",
- taskCode, processInstance.getId(), processInstance.getState());
- return DependResult.WAITING;
- }
TaskDefinition taskDefinition = processService.findTaskDefinitionByCode(taskCode);
if (taskDefinition == null) {
logger.error("Cannot find the task definition, something error, taskCode: {}", taskCode);
- } else {
- logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName());
+ return DependResult.FAILED;
+ }
+ if (taskDefinition.getFlag() == Flag.NO) {
+ logger.warn("Cannot find the task instance, but the task is forbidden, so dependent success, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName());
+ return DependResult.SUCCESS;
}
- result = DependResult.FAILED;
+ if (!processInstance.getState().typeIsFinished()) {
+ logger.info("Wait for the dependent workflow to complete, taskCode:{}, processInstanceId:{}, processInstance state:{}", taskCode, processInstance.getId(), processInstance.getState());
+ return DependResult.WAITING;
+ }
+ logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName());
+ return DependResult.FAILED;
} else {
logger.info("The running task, taskId:{}, taskCode:{}, taskName:{}", taskInstance.getId(), taskInstance.getTaskCode(), taskInstance.getName());
- result = getDependResultByState(taskInstance.getState());
+ return getDependResultByState(taskInstance.getState());
}
- return result;
}
/**