You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/08 07:08:22 UTC

[dolphinscheduler] branch dev updated: [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37325b4c34 [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
37325b4c34 is described below

commit 37325b4c3410c2fe2f025c16363ea0c3a157647e
Author: Stalary <st...@163.com>
AuthorDate: Thu Sep 8 15:08:10 2022 +0800

    [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
    
    * FIX: dependent
    
    * FIX: version
    
    * MOD: for review
---
 .../api/service/impl/ExecutorServiceImpl.java           |  8 +++++---
 .../dao/entity/DependentProcessDefinition.java          | 17 +++++++++++++++--
 .../dao/mapper/WorkFlowLineageMapper.xml                |  1 +
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..b4fb3fb8aa 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -907,6 +907,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
         dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
             dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+            dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
             dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
             Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
             cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@@ -927,7 +928,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
 
         return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
-                workerGroup);
+                workerGroup, processDefinitionCode);
     }
 
     /**
@@ -938,7 +939,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
                                                                                   List<DependentProcessDefinition> dependentProcessDefinitionList,
                                                                                   CycleEnum processDefinitionCycle,
-                                                                                  String workerGroup) {
+                                                                                  String workerGroup,
+                                                                                  long upstreamProcessDefinitionCode) {
         List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
 
         List<Long> processDefinitionCodeList =
@@ -949,7 +951,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                 processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
 
         for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
-            if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
+            if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
                 if (processDefinitionWorkerGroupMap
                         .get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
                     dependentProcessDefinition.setWorkerGroup(workerGroup);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
index 9de57dff33..87bb3d4234 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -41,6 +41,11 @@ public class DependentProcessDefinition {
      */
     private String processDefinitionName;
 
+    /**
+     * process definition version
+     **/
+    private int processDefinitionVersion;
+
     /**
      * task definition name
      */
@@ -60,14 +65,14 @@ public class DependentProcessDefinition {
      * get dependent cycle
      * @return CycleEnum
      */
-    public CycleEnum getDependentCycle() {
+    public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
         DependentParameters dependentParameters = this.getDependentParameters();
         List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
 
         for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
             List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
             for (DependentItem dependentItem : dependentItemList) {
-                if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
+                if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) {
                     return cycle2CycleEnum(dependentItem.getCycle());
                 }
             }
@@ -122,6 +127,14 @@ public class DependentProcessDefinition {
         this.processDefinitionCode = code;
     }
 
+    public int getProcessDefinitionVersion() {
+        return processDefinitionVersion;
+    }
+
+    public void setProcessDefinitionVersion(int processDefinitionVersion) {
+        this.processDefinitionVersion = processDefinitionVersion;
+    }
+
     public long getTaskDefinitionCode() {
         return this.taskDefinitionCode;
     }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b17499bb60..2689b6d50f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -149,6 +149,7 @@
         SELECT
         c.code AS process_definition_code
         ,c.name AS process_definition_name
+        ,c.version as process_definition_version
         ,a.code AS task_definition_code
         ,a.task_params
         FROM