You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/04/02 16:16:13 UTC
[incubator-dolphinscheduler] branch dev updated: #2282 fix workflow
dependent bug (#2329)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 69e000b #2282 fix workflow dependent bug (#2329)
69e000b is described below
commit 69e000b54214e8331ede69bdce9710b8418e960e
Author: zixi0825 <64...@qq.com>
AuthorDate: Fri Apr 3 00:16:01 2020 +0800
#2282 fix workflow dependent bug (#2329)
* fix workflow dependent bug
* fix workflow dependent bug 2
* fix workflow dependent bug 2
Co-authored-by: sunchaohe <su...@linklogis.com>
Co-authored-by: dailidong <da...@gmail.com>
---
.../src/main/resources/application.properties | 2 +-
.../worker/task/dependent/DependentExecute.java | 95 +++++++++++++++++----
.../worker/task/dependent/DependentTask.java | 9 +-
.../worker/task/dependent/DependentTaskTest.java | 99 +++++++++++++++++-----
.../service/process/ProcessService.java | 24 ++++++
pom.xml | 1 +
6 files changed, 187 insertions(+), 43 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/application.properties
index 06b0ee9..c928c72 100644
--- a/dolphinscheduler-dao/src/main/resources/application.properties
+++ b/dolphinscheduler-dao/src/main/resources/application.properties
@@ -22,7 +22,7 @@ spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
# mysql
#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-#spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
+#spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
# h2
#spring.datasource.driver-class-name=org.h2.Driver
#spring.datasource.url=jdbc:h2:file:../sql/h2;AUTO_SERVER=TRUE
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
index b08cabc..087bb80 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -82,7 +83,7 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
- public DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
+ private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
return calculateResultForTasks(dependentItem, dateIntervals );
}
@@ -94,7 +95,8 @@ public class DependentExecute {
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
- List<DateInterval> dateIntervals) {
+ List<DateInterval> dateIntervals) {
+
DependResult result = DependResult.FAILED;
for(DateInterval dateInterval : dateIntervals){
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
@@ -104,25 +106,35 @@ public class DependentExecute {
dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
return DependResult.FAILED;
}
+ // need to check workflow for updates, so get all task and check the task state
if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
- result = getDependResultByState(processInstance.getState());
- }else{
- TaskInstance taskInstance = null;
- List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+ List<TaskNode> taskNodes =
+ processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId());
- for(TaskInstance task : taskInstanceList){
- if(task.getName().equals(dependentItem.getDepTasks())){
- taskInstance = task;
- break;
+ if(taskNodes != null && taskNodes.size() > 0){
+ List<DependResult> results = new ArrayList<>();
+ DependResult tmpResult = DependResult.FAILED;
+ for(TaskNode taskNode:taskNodes){
+ tmpResult = getDependTaskResult(taskNode.getName(),processInstance);
+ if(DependResult.FAILED == tmpResult){
+ break;
+ }else{
+ results.add(getDependTaskResult(taskNode.getName(),processInstance));
+ }
+ }
+
+ if(DependResult.FAILED == tmpResult){
+ result = DependResult.FAILED;
+ }else if(results.contains(DependResult.WAITING)){
+ result = DependResult.WAITING;
+ }else{
+ result = DependResult.SUCCESS;
}
- }
- if(taskInstance == null){
- // cannot find task in the process instance
- // maybe because process instance is running or failed.
- result = getDependResultByState(processInstance.getState());
}else{
- result = getDependResultByState(taskInstance.getState());
+ result = DependResult.FAILED;
}
+ }else{
+ result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
}
if(result != DependResult.SUCCESS){
break;
@@ -132,6 +144,35 @@ public class DependentExecute {
}
/**
+ * get depend task result
+ * @param taskName
+ * @param processInstance
+ * @return
+ */
+ private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) {
+ DependResult result = DependResult.FAILED;
+ TaskInstance taskInstance = null;
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+
+ for(TaskInstance task : taskInstanceList){
+ if(task.getName().equals(taskName)){
+ taskInstance = task;
+ break;
+ }
+ }
+
+ if(taskInstance == null){
+ // cannot find task in the process instance
+ // maybe because process instance is running or failed.
+ result = getDependResultByProcessStateWhenTaskNull(processInstance.getState());
+ }else{
+ result = getDependResultByState(taskInstance.getState());
+ }
+
+ return result;
+ }
+
+ /**
* find the last one process instance that :
* 1. manual run and finish between the interval
* 2. schedule run and schedule time between the interval
@@ -172,7 +213,9 @@ public class DependentExecute {
*/
private DependResult getDependResultByState(ExecutionStatus state) {
- if(state.typeIsRunning() || state == ExecutionStatus.SUBMITTED_SUCCESS || state == ExecutionStatus.WAITTING_THREAD){
+ if(state.typeIsRunning()
+ || state == ExecutionStatus.SUBMITTED_SUCCESS
+ || state == ExecutionStatus.WAITTING_THREAD){
return DependResult.WAITING;
}else if(state.typeIsSuccess()){
return DependResult.SUCCESS;
@@ -182,6 +225,22 @@ public class DependentExecute {
}
/**
+ * get dependent result by task instance state when task instance is null
+ * @param state state
+ * @return DependResult
+ */
+ private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
+
+ if(state.typeIsRunning()
+ || state == ExecutionStatus.SUBMITTED_SUCCESS
+ || state == ExecutionStatus.WAITTING_THREAD){
+ return DependResult.WAITING;
+ }else{
+ return DependResult.FAILED;
+ }
+ }
+
+ /**
* judge depend item finished
* @param currentTime current time
* @return boolean
@@ -222,7 +281,7 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
- public DependResult getDependResultForItem(DependentItem item, Date currentTime){
+ private DependResult getDependResultForItem(DependentItem item, Date currentTime){
String key = item.getKey();
if(dependResultMap.containsKey(key)){
return dependResultMap.get(key);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
index f074d57..b426d32 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
@@ -82,10 +82,11 @@ public class DependentTask extends AbstractTask {
this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(),
DependentParameters.class);
-
- for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
- this.dependentTaskList.add(new DependentExecute(
- taskModel.getDependItemList(), taskModel.getRelation()));
+ if(dependentParameters != null){
+ for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
+ this.dependentTaskList.add(new DependentExecute(
+ taskModel.getDependItemList(), taskModel.getRelation()));
+ }
}
this.processService = SpringApplicationContext.getBean(ProcessService.class);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
index 272fb54..c13a764 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
@@ -17,47 +17,106 @@
package org.apache.dolphinscheduler.server.worker.task.dependent;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.model.DateInterval;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
public class DependentTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class);
+ private ProcessService processService;
+ private ApplicationContext applicationContext;
- @Test
- public void testDependInit() throws Exception{
- TaskProps taskProps = new TaskProps();
+ @Before
+ public void before() throws Exception{
+ processService = Mockito.mock(ProcessService.class);
+ Mockito.when(processService
+ .findLastRunningProcess(4,DependentDateUtils.getTodayInterval(new Date()).get(0)))
+ .thenReturn(findLastProcessInterval());
+ Mockito.when(processService
+ .getTaskNodeListByDefinitionId(4))
+ .thenReturn(getTaskNodes());
+ Mockito.when(processService
+ .findValidTaskListByProcessId(11))
+ .thenReturn(getTaskInstances());
- String dependString = "{\n" +
- "\"dependTaskList\":[\n" +
- " {\n" +
- " \"dependItemList\":[\n" +
- " {\n" +
- " \"definitionId\": 101,\n" +
- " \"depTasks\": \"ALL\",\n" +
- " \"cycle\": \"day\",\n" +
- " \"dateValue\": \"last1Day\"\n" +
- " }\n" +
- " ],\n" +
- " \"relation\": \"AND\"\n" +
- " }\n" +
- " ],\n" +
- "\"relation\":\"OR\"\n" +
- "}";
+ Mockito.when(processService
+ .findTaskInstanceById(252612))
+ .thenReturn(getTaskInstance());
+ applicationContext = Mockito.mock(ApplicationContext.class);
+ SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+ springApplicationContext.setApplicationContext(applicationContext);
+ Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+ }
+ @Test
+ public void test() throws Exception{
+
+ TaskProps taskProps = new TaskProps();
+ String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskProps.setTaskInstId(252612);
taskProps.setDependence(dependString);
+ taskProps.setTaskStartTime(new Date());
DependentTask dependentTask = new DependentTask(taskProps, logger);
dependentTask.init();
dependentTask.handle();
- Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_FAILURE );
+ Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS );
}
+ private ProcessInstance findLastProcessInterval(){
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(11);
+ processInstance.setState(ExecutionStatus.SUCCESS);
+ return processInstance;
+ }
+
+ private List<TaskNode> getTaskNodes(){
+ List<TaskNode> list = new ArrayList<>();
+ TaskNode taskNode = new TaskNode();
+ taskNode.setName("C");
+ taskNode.setType("SQL");
+ list.add(taskNode);
+ return list;
+ }
+ private List<TaskInstance> getTaskInstances(){
+ List<TaskInstance> list = new ArrayList<>();
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setName("C");
+ taskInstance.setState(ExecutionStatus.SUCCESS);
+ taskInstance.setDependency("1231");
+ list.add(taskInstance);
+ return list;
+ }
+
+ private TaskInstance getTaskInstance(){
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(252612);
+ taskInstance.setName("C");
+ taskInstance.setState(ExecutionStatus.SUCCESS);
+ return taskInstance;
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 3312c10..ce4424e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -237,6 +237,30 @@ public class ProcessService {
}
/**
+ * get task node list by definitionId
+ * @param defineId
+ * @return
+ */
+ public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
+ ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
+ if (processDefinition == null) {
+ logger.info("process define not exists");
+ return null;
+ }
+
+ String processDefinitionJson = processDefinition.getProcessDefinitionJson();
+ ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
+
+ //process data check
+ if (null == processData) {
+ logger.error("process data is null");
+ return null;
+ }
+
+ return processData.getTasks();
+ }
+
+ /**
* find process instance by id
* @param processId processId
* @return process instance
diff --git a/pom.xml b/pom.xml
index ec0435c..84df526 100644
--- a/pom.xml
+++ b/pom.xml
@@ -740,6 +740,7 @@
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
+ <include>**/server/worker/task/dependent/DependentTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/dao/datasource/BaseDataSourceTest.java</include>