You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/08 07:08:22 UTC
[dolphinscheduler] branch dev updated: [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 37325b4c34 [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
37325b4c34 is described below
commit 37325b4c3410c2fe2f025c16363ea0c3a157647e
Author: Stalary <st...@163.com>
AuthorDate: Thu Sep 8 15:08:10 2022 +0800
[Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)
* FIX: dependent
* FIX: version
* MOD: for review
---
.../api/service/impl/ExecutorServiceImpl.java | 8 +++++---
.../dao/entity/DependentProcessDefinition.java | 17 +++++++++++++++--
.../dao/mapper/WorkFlowLineageMapper.xml | 1 +
3 files changed, 21 insertions(+), 5 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 957447dd68..b4fb3fb8aa 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -907,6 +907,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+ dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@@ -927,7 +928,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
- workerGroup);
+ workerGroup, processDefinitionCode);
}
/**
@@ -938,7 +939,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
- String workerGroup) {
+ String workerGroup,
+ long upstreamProcessDefinitionCode) {
List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
List<Long> processDefinitionCodeList =
@@ -949,7 +951,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
- if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
+ if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
if (processDefinitionWorkerGroupMap
.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
dependentProcessDefinition.setWorkerGroup(workerGroup);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
index 9de57dff33..87bb3d4234 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
@@ -41,6 +41,11 @@ public class DependentProcessDefinition {
*/
private String processDefinitionName;
+ /**
+ * process definition version
+ **/
+ private int processDefinitionVersion;
+
/**
* task definition name
*/
@@ -60,14 +65,14 @@ public class DependentProcessDefinition {
* get dependent cycle
* @return CycleEnum
*/
- public CycleEnum getDependentCycle() {
+ public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
DependentParameters dependentParameters = this.getDependentParameters();
List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
for (DependentItem dependentItem : dependentItemList) {
- if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
+ if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) {
return cycle2CycleEnum(dependentItem.getCycle());
}
}
@@ -122,6 +127,14 @@ public class DependentProcessDefinition {
this.processDefinitionCode = code;
}
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
+ }
+
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
+ }
+
public long getTaskDefinitionCode() {
return this.taskDefinitionCode;
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
index b17499bb60..2689b6d50f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
@@ -149,6 +149,7 @@
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name
+ ,c.version as process_definition_version
,a.code AS task_definition_code
,a.task_params
FROM