You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 10:32:33 UTC
[dolphinscheduler] branch 3.0.1-prepare updated: Add dependent task instance log (#11541) (#12014)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
new 8fefb9ecfd Add dependent task instance log (#11541) (#12014)
8fefb9ecfd is described below
commit 8fefb9ecfdc047c56b2c9631d00f4f31a8083d1c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Sep 17 18:32:28 2022 +0800
Add dependent task instance log (#11541) (#12014)
* Add dependent task instance log
* Optimize log
* Fix dependent task initialize failed will throw exception
(cherry picked from commit a41c6824fcbd8e835023bedd32966720778aba18)
---
.../master/runner/task/DependentTaskProcessor.java | 118 +++++++++++++++++----
.../plugin/task/api/model/DependentTaskModel.java | 18 +---
.../task/api/parameters/DependentParameters.java | 19 +---
3 files changed, 101 insertions(+), 54 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index ee535d997b..700367afc9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -17,25 +17,37 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
-
+import com.google.auto.service.AutoService;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
-import com.google.auto.service.AutoService;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
/**
* dependent task processor
@@ -45,6 +57,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
+ private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
+
+ private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class);
+
+ private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class);
+
/**
* dependent task list
*/
@@ -56,6 +74,10 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
*/
private Map<String, DependResult> dependResultMap = new HashMap<>();
+ private Map<Long, Project> projectCodeMap = new HashMap<>();
+ private Map<Long, ProcessDefinition> processDefinitionMap = new HashMap<>();
+ private Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
+
/**
* dependent date
*/
@@ -67,23 +89,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submitTask() {
- this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
+ try {
+ this.taskInstance =
+ processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
- if (this.taskInstance == null) {
+ if (this.taskInstance == null) {
+ return false;
+ }
+ this.setTaskExecutionLogger();
+ logger.info("Dependent task submit success");
+ taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
+ processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId()));
+ taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ taskInstance.setStartTime(new Date());
+ processService.updateTaskInstance(taskInstance);
+ initDependParameters();
+ logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
+ return true;
+ } catch (Exception ex) {
+ logger.error("Submit/Initialize dependent task error", ex);
return false;
}
- this.setTaskExecutionLogger();
- taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
- processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
- taskInstance.getProcessInstanceId(),
- taskInstance.getId()));
- taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
- taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- taskInstance.setStartTime(new Date());
- processService.updateTaskInstance(taskInstance);
- initDependParameters();
- return true;
}
@Override
@@ -122,14 +152,56 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
*/
private void initDependParameters() {
this.dependentParameters = taskInstance.getDependency();
- for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
- this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
- }
if (processInstance.getScheduleTime() != null) {
this.dependentDate = this.processInstance.getScheduleTime();
} else {
this.dependentDate = new Date();
}
+ // check dependent project is exist
+ List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
+ Set<Long> projectCodes = new HashSet<>();
+ Set<Long> processDefinitionCodes = new HashSet<>();
+ Set<Long> taskDefinitionCodes = new HashSet<>();
+ dependTaskList.forEach(dependentTaskModel -> {
+ dependentTaskModel.getDependItemList().forEach(dependentItem -> {
+ projectCodes.add(dependentItem.getProjectCode());
+ processDefinitionCodes.add(dependentItem.getDefinitionCode());
+ taskDefinitionCodes.add(dependentItem.getDepTaskCode());
+ });
+ });
+ projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity()));
+ processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
+ taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
+
+ for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
+ logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation());
+ for (DependentItem dependentItem : taskModel.getDependItemList()) {
+ Project project = projectCodeMap.get(dependentItem.getProjectCode());
+ if (project == null) {
+ logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem);
+ throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem);
+ }
+ ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode());
+ if (processDefinition == null) {
+ logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
+ throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
+ }
+ if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
+ logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}",
+ project.getName(), processDefinition.getName(), dependentItem.getKey());
+
+ } else {
+ TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
+ if (taskDefinition == null) {
+ logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
+ throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
+ }
+ logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
+ project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
+ }
+ }
+ this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
+ }
}
@Override
@@ -159,8 +231,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
for (Map.Entry<String, DependResult> entry : dependentExecute.getDependResultMap().entrySet()) {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
- //save depend result to log
- logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue());
+ // save depend result to log
+ logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
@@ -182,7 +254,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
dependResultList.add(dependResult);
}
result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList);
- logger.info("dependent task completed, dependent result: {}", result);
+ logger.info("Dependent task completed, dependent result: {}", result);
return result;
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java
index f1b6a5ea5f..3501db67c8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java
@@ -17,29 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api.model;
+import lombok.Data;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import java.util.List;
+@Data
public class DependentTaskModel {
-
private List<DependentItem> dependItemList;
private DependentRelation relation;
- public List<DependentItem> getDependItemList() {
- return dependItemList;
- }
-
- public void setDependItemList(List<DependentItem> dependItemList) {
- this.dependItemList = dependItemList;
- }
-
- public DependentRelation getRelation() {
- return relation;
- }
-
- public void setRelation(DependentRelation relation) {
- this.relation = relation;
- }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
index bfbab6b759..e0e6b40068 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
@@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import java.util.List;
+@Data
+@EqualsAndHashCode(callSuper = true)
public class DependentParameters extends AbstractParameters {
private List<DependentTaskModel> dependTaskList;
@@ -32,19 +36,4 @@ public class DependentParameters extends AbstractParameters {
return true;
}
- public List<DependentTaskModel> getDependTaskList() {
- return dependTaskList;
- }
-
- public void setDependTaskList(List<DependentTaskModel> dependTaskList) {
- this.dependTaskList = dependTaskList;
- }
-
- public DependentRelation getRelation() {
- return relation;
- }
-
- public void setRelation(DependentRelation relation) {
- this.relation = relation;
- }
}