You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 10:32:33 UTC

[dolphinscheduler] branch 3.0.1-prepare updated: Add dependent task instance log (#11541) (#12014)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
     new 8fefb9ecfd Add dependent task instance log (#11541) (#12014)
8fefb9ecfd is described below

commit 8fefb9ecfdc047c56b2c9631d00f4f31a8083d1c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Sep 17 18:32:28 2022 +0800

    Add dependent task instance log (#11541) (#12014)
    
    * Add dependent task instance log
    
    * Optimize log
    
    * Fix dependent task initialize failed will throw exception
    
    (cherry picked from commit a41c6824fcbd8e835023bedd32966720778aba18)
---
 .../master/runner/task/DependentTaskProcessor.java | 118 +++++++++++++++++----
 .../plugin/task/api/model/DependentTaskModel.java  |  18 +---
 .../task/api/parameters/DependentParameters.java   |  19 +---
 3 files changed, 101 insertions(+), 54 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index ee535d997b..700367afc9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -17,25 +17,37 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
-
+import com.google.auto.service.AutoService;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import com.google.auto.service.AutoService;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
 
 /**
  * dependent task processor
@@ -45,6 +57,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
 
     private DependentParameters dependentParameters;
 
+    private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
+
+    private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class);
+
+    private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class);
+
     /**
      * dependent task list
      */
@@ -56,6 +74,10 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
      */
     private Map<String, DependResult> dependResultMap = new HashMap<>();
 
+    private Map<Long, Project> projectCodeMap = new HashMap<>();
+    private Map<Long, ProcessDefinition> processDefinitionMap = new HashMap<>();
+    private Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
+
     /**
      * dependent date
      */
@@ -67,23 +89,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
 
     @Override
     public boolean submitTask() {
-        this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
+        try {
+            this.taskInstance =
+                    processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
 
-        if (this.taskInstance == null) {
+            if (this.taskInstance == null) {
+                return false;
+            }
+            this.setTaskExecutionLogger();
+            logger.info("Dependent task submit success");
+            taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
+                    processInstance.getProcessDefinitionCode(),
+                    processInstance.getProcessDefinitionVersion(),
+                    taskInstance.getProcessInstanceId(),
+                    taskInstance.getId()));
+            taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+            taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+            taskInstance.setStartTime(new Date());
+            processService.updateTaskInstance(taskInstance);
+            initDependParameters();
+            logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
+            return true;
+        } catch (Exception ex) {
+            logger.error("Submit/Initialize dependent task error", ex);
             return false;
         }
-        this.setTaskExecutionLogger();
-        taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
-                processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion(),
-                taskInstance.getProcessInstanceId(),
-                taskInstance.getId()));
-        taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
-        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        taskInstance.setStartTime(new Date());
-        processService.updateTaskInstance(taskInstance);
-        initDependParameters();
-        return true;
     }
 
     @Override
@@ -122,14 +152,56 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
      */
     private void initDependParameters() {
         this.dependentParameters = taskInstance.getDependency();
-        for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
-            this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
-        }
         if (processInstance.getScheduleTime() != null) {
             this.dependentDate = this.processInstance.getScheduleTime();
         } else {
             this.dependentDate = new Date();
         }
+        // check dependent project is exist
+        List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
+        Set<Long> projectCodes = new HashSet<>();
+        Set<Long> processDefinitionCodes = new HashSet<>();
+        Set<Long> taskDefinitionCodes = new HashSet<>();
+        dependTaskList.forEach(dependentTaskModel -> {
+            dependentTaskModel.getDependItemList().forEach(dependentItem -> {
+                projectCodes.add(dependentItem.getProjectCode());
+                processDefinitionCodes.add(dependentItem.getDefinitionCode());
+                taskDefinitionCodes.add(dependentItem.getDepTaskCode());
+            });
+        });
+        projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity()));
+        processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
+        taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
+
+        for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
+            logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation());
+            for (DependentItem dependentItem : taskModel.getDependItemList()) {
+                Project project = projectCodeMap.get(dependentItem.getProjectCode());
+                if (project == null) {
+                    logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem);
+                    throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem);
+                }
+                ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode());
+                if (processDefinition == null) {
+                    logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
+                    throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
+                }
+                if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
+                    logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}",
+                            project.getName(), processDefinition.getName(), dependentItem.getKey());
+
+                } else {
+                    TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
+                    if (taskDefinition == null) {
+                        logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
+                        throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
+                    }
+                    logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
+                            project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
+                }
+            }
+            this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
+        }
     }
 
     @Override
@@ -159,8 +231,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
             for (Map.Entry<String, DependResult> entry : dependentExecute.getDependResultMap().entrySet()) {
                 if (!dependResultMap.containsKey(entry.getKey())) {
                     dependResultMap.put(entry.getKey(), entry.getValue());
-                    //save depend result to log
-                    logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue());
+                    // save depend result to log
+                    logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
                 }
             }
             if (!dependentExecute.finish(dependentDate)) {
@@ -182,7 +254,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
             dependResultList.add(dependResult);
         }
         result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList);
-        logger.info("dependent task completed, dependent result: {}", result);
+        logger.info("Dependent task completed, dependent result: {}", result);
         return result;
     }
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java
index f1b6a5ea5f..3501db67c8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java
@@ -17,29 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.model;
 
+import lombok.Data;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
 
 import java.util.List;
 
+@Data
 public class DependentTaskModel {
 
-
     private List<DependentItem> dependItemList;
     private DependentRelation relation;
 
-    public List<DependentItem> getDependItemList() {
-        return dependItemList;
-    }
-
-    public void setDependItemList(List<DependentItem> dependItemList) {
-        this.dependItemList = dependItemList;
-    }
-
-    public DependentRelation getRelation() {
-        return relation;
-    }
-
-    public void setRelation(DependentRelation relation) {
-        this.relation = relation;
-    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
index bfbab6b759..e0e6b40068 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java
@@ -17,11 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.parameters;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
 import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
 
 import java.util.List;
 
+@Data
+@EqualsAndHashCode(callSuper = true)
 public class DependentParameters extends AbstractParameters {
 
     private List<DependentTaskModel> dependTaskList;
@@ -32,19 +36,4 @@ public class DependentParameters extends AbstractParameters {
         return true;
     }
 
-    public List<DependentTaskModel> getDependTaskList() {
-        return dependTaskList;
-    }
-
-    public void setDependTaskList(List<DependentTaskModel> dependTaskList) {
-        this.dependTaskList = dependTaskList;
-    }
-
-    public DependentRelation getRelation() {
-        return relation;
-    }
-
-    public void setRelation(DependentRelation relation) {
-        this.relation = relation;
-    }
 }