You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/19 10:31:35 UTC
[dolphinscheduler] branch dev updated: [DS-6891][MasterServer] reduce db operation when process instance running (#6904)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 28872d8 [DS-6891][MasterServer] reduce db operation when process instance running (#6904)
28872d8 is described below
commit 28872d8706f7414868d6101600f9f8bbd3c9d15f
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Fri Nov 19 18:31:24 2021 +0800
[DS-6891][MasterServer] reduce db operation when process instance running (#6904)
* remove taskInstanceCacheManager
* remove taskInstanceCacheManager
* [DS-6891][MasterServer] reduce db operation when process instance running
* [DS-6891][MasterServer] reduce db operation when process instance running
* fix test
* fix Transactional method call
* checkstyle
Co-authored-by: caishunfeng <53...@qq.com>
---
.../master/cache/TaskInstanceCacheManager.java | 64 ----
.../cache/impl/TaskInstanceCacheManagerImpl.java | 155 --------
.../master/consumer/TaskPriorityQueueConsumer.java | 28 +-
.../server/master/processor/TaskAckProcessor.java | 10 -
.../master/processor/TaskResponseProcessor.java | 10 -
.../processor/queue/StateEventResponseService.java | 9 +
.../processor/queue/TaskResponseService.java | 108 ++++--
.../master/runner/WorkflowExecuteThread.java | 412 ++++++++++++++-------
.../master/runner/task/BaseTaskProcessor.java | 8 +-
.../master/runner/task/CommonTaskProcessor.java | 2 +-
.../master/runner/task/ConditionTaskProcessor.java | 2 +-
.../master/runner/task/DependentTaskProcessor.java | 3 +-
.../master/runner/task/SubTaskProcessor.java | 2 +-
.../master/runner/task/SwitchTaskProcessor.java | 2 +-
.../server/master/ConditionsTaskTest.java | 2 +-
.../server/master/DependentTaskTest.java | 9 +-
.../server/master/SubProcessTaskTest.java | 10 +-
.../server/master/SwitchTaskTest.java | 2 +-
.../server/master/WorkflowExecuteThreadTest.java | 39 +-
.../impl/TaskInstanceCacheManagerImplTest.java | 177 ---------
.../consumer/TaskPriorityQueueConsumerTest.java | 9 -
.../master/processor/TaskAckProcessorTest.java | 10 +-
.../runner/task/CommonTaskProcessorTest.java | 1 -
.../server/registry/DependencyConfig.java | 6 -
.../processor/TaskCallbackServiceTestConfig.java | 6 -
.../service/alert/ProcessAlertManager.java | 49 ++-
.../service/process/ProcessService.java | 91 ++---
.../service/queue/TaskPriority.java | 22 +-
28 files changed, 504 insertions(+), 744 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
deleted file mode 100644
index 1388c5b..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.cache;
-
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
-
-/**
- * task instance state manager
- */
-public interface TaskInstanceCacheManager {
-
- /**
- * get taskInstance by taskInstance id
- *
- * @param taskInstanceId taskInstanceId
- * @return taskInstance
- */
- TaskInstance getByTaskInstanceId(Integer taskInstanceId);
-
- /**
- * cache taskInstance
- *
- * @param taskExecutionContext taskExecutionContext
- */
- void cacheTaskInstance(TaskExecutionContext taskExecutionContext);
-
- /**
- * cache taskInstance
- *
- * @param taskAckCommand taskAckCommand
- */
- void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
-
- /**
- * cache taskInstance
- *
- * @param taskExecuteResponseCommand taskExecuteResponseCommand
- */
- void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
-
- /**
- * remove taskInstance by taskInstanceId
- * @param taskInstanceId taskInstanceId
- */
- void removeByTaskInstanceId(Integer taskInstanceId);
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
deleted file mode 100644
index dd2d6eb..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.cache.impl;
-
-import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * taskInstance state manager
- */
-@Component
-public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
-
- /**
- * taskInstance cache
- */
- private Map<Integer,TaskInstance> taskInstanceCache = new ConcurrentHashMap<>();
-
- /**
- * process service
- */
- @Autowired
- private ProcessService processService;
-
- /**
- * taskInstance cache refresh timer
- */
- private Timer refreshTaskInstanceTimer = null;
-
- @PostConstruct
- public void init() {
- //issue#5539 add thread to fetch task state from database in a fixed rate
- this.refreshTaskInstanceTimer = new Timer(true);
- refreshTaskInstanceTimer.scheduleAtFixedRate(
- new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS
- );
- }
-
- @PreDestroy
- public void close() {
- this.refreshTaskInstanceTimer.cancel();
- }
-
- /**
- * get taskInstance by taskInstance id
- *
- * @param taskInstanceId taskInstanceId
- * @return taskInstance
- */
- @Override
- public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
- return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId));
- }
-
- /**
- * cache taskInstance
- *
- * @param taskExecutionContext taskExecutionContext
- */
- @Override
- public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(taskExecutionContext.getTaskInstanceId());
- taskInstance.setName(taskExecutionContext.getTaskName());
- taskInstance.setStartTime(taskExecutionContext.getStartTime());
- taskInstance.setTaskType(taskExecutionContext.getTaskType());
- taskInstance.setExecutePath(taskExecutionContext.getExecutePath());
- taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance);
- }
-
- /**
- * cache taskInstance
- *
- * @param taskAckCommand taskAckCommand
- */
- @Override
- public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
- taskInstance.setStartTime(taskAckCommand.getStartTime());
- taskInstance.setHost(taskAckCommand.getHost());
- taskInstance.setExecutePath(taskAckCommand.getExecutePath());
- taskInstance.setLogPath(taskAckCommand.getLogPath());
- taskInstanceCache.put(taskAckCommand.getTaskInstanceId(), taskInstance);
- }
-
- /**
- * cache taskInstance
- *
- * @param taskExecuteResponseCommand taskExecuteResponseCommand
- */
- @Override
- public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
- TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
- taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
- taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
- taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance);
- }
-
- /**
- * remove taskInstance by taskInstanceId
- * @param taskInstanceId taskInstanceId
- */
- @Override
- public void removeByTaskInstanceId(Integer taskInstanceId) {
- taskInstanceCache.remove(taskInstanceId);
- }
-
- class RefreshTaskInstanceTimerTask extends TimerTask {
- @Override
- public void run() {
- for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
- TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
- if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
- taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance);
- }
- }
-
- }
- }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 827759b..290164b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -130,14 +130,16 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
- if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
- // when task finish, ignore this task, there is no need to dispatch anymore
- return true;
- } else {
- result = dispatcher.dispatch(executionContext);
+ if (isTaskNeedToCheck(taskPriority)) {
+ if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
+ // when task finish, ignore this task, there is no need to dispatch anymore
+ return true;
+ }
}
+
+ result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
- logger.error("dispatch error: {}", e.getMessage(),e);
+ logger.error("dispatch error: {}", e.getMessage(), e);
}
return result;
}
@@ -153,4 +155,18 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().typeIsFinished();
}
+
+ /**
+ * check if task need to check state, if true, refresh the checkpoint
+ * @param taskPriority
+ * @return
+ */
+ private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
+ long now = System.currentTimeMillis();
+ if (now - taskPriority.getCheckpoint() > Constants.SECOND_TIME_MILLIS) {
+ taskPriority.setCheckpoint(now);
+ return true;
+ }
+ return false;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 8761232..27b8991 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -24,8 +24,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -49,14 +47,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final TaskResponseService taskResponseService;
- /**
- * taskInstance cache manager
- */
- private final TaskInstanceCacheManager taskInstanceCacheManager;
-
public TaskAckProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
- this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@@ -71,8 +63,6 @@ public class TaskAckProcessor implements NettyRequestProcessor {
TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand);
- taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
-
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 405e6be..aa324eb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -48,14 +46,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final TaskResponseService taskResponseService;
- /**
- * taskInstance cache manager
- */
- private final TaskInstanceCacheManager taskInstanceCacheManager;
-
public TaskResponseProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
- this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@@ -72,8 +64,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand);
- taskInstanceCacheManager.cacheTaskInstance(responseCommand);
-
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 72e2355..9a7fc59 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -133,6 +133,15 @@ public class StateEventResponseService {
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+ switch (stateEvent.getType()) {
+ case TASK_STATE_CHANGE:
+ workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
+ break;
+ case PROCESS_STATE_CHANGE:
+ workflowExecuteThread.refreshProcessInstance(stateEvent.getProcessInstanceId());
+ break;
+ default:
+ }
workflowExecuteThread.addStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 9af1ae2..7a0af9d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -142,54 +142,28 @@ public class TaskResponseService {
*/
private void persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
- Channel channel = taskResponseEvent.getChannel();
+ int taskInstanceId = taskResponseEvent.getTaskInstanceId();
+ int processInstanceId = taskResponseEvent.getProcessInstanceId();
+
+ TaskInstance taskInstance;
+ WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
+ taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+ } else {
+ taskInstance = processService.findTaskInstanceById(taskInstanceId);
+ }
- TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
switch (event) {
case ACK:
- try {
- if (taskInstance != null) {
- ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
- processService.changeTaskState(taskInstance, status,
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
- }
- // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
- DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
- channel.writeAndFlush(taskAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("worker ack master error", e);
- DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskAckCommand.convert2Command());
- }
+ handleAckEvent(taskResponseEvent, taskInstance);
break;
case RESULT:
- try {
- if (taskInstance != null) {
- processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
- taskResponseEvent.getEndTime(),
- taskResponseEvent.getProcessId(),
- taskResponseEvent.getAppIds(),
- taskResponseEvent.getTaskInstanceId(),
- taskResponseEvent.getVarPool()
- );
- }
- // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
- DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
- channel.writeAndFlush(taskResponseCommand.convert2Command());
- } catch (Exception e) {
- logger.error("worker response master error", e);
- DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskResponseCommand.convert2Command());
- }
+ handleResultEvent(taskResponseEvent, taskInstance);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
- WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId());
+
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
@@ -200,7 +174,59 @@ public class TaskResponseService {
}
}
- public BlockingQueue<TaskResponseEvent> getEventQueue() {
- return eventQueue;
+ /**
+ * handle ack event
+ * @param taskResponseEvent
+ * @param taskInstance
+ */
+ private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
+ Channel channel = taskResponseEvent.getChannel();
+ try {
+ if (taskInstance != null) {
+ if (taskInstance.getState().typeIsFinished()) {
+ logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
+ } else {
+ processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+ taskResponseEvent.getStartTime(),
+ taskResponseEvent.getWorkerAddress(),
+ taskResponseEvent.getExecutePath(),
+ taskResponseEvent.getLogPath()
+ );
+ }
+ }
+ // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+ DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskAckCommand.convert2Command());
+ } catch (Exception e) {
+ logger.error("worker ack master error", e);
+ DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+ channel.writeAndFlush(taskAckCommand.convert2Command());
+ }
+ }
+
+ /**
+ * handle result event
+ * @param taskResponseEvent
+ * @param taskInstance
+ */
+ private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
+ Channel channel = taskResponseEvent.getChannel();
+ try {
+ if (taskInstance != null) {
+ processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+ taskResponseEvent.getEndTime(),
+ taskResponseEvent.getProcessId(),
+ taskResponseEvent.getAppIds(),
+ taskResponseEvent.getVarPool()
+ );
+ }
+ // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+ DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskResponseCommand.convert2Command());
+ } catch (Exception e) {
+ logger.error("worker response master error", e);
+ DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
+ channel.writeAndFlush(taskResponseCommand.convert2Command());
+ }
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index ad12abe..44eac21 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -86,9 +86,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
-import com.google.common.collect.Table;
/**
* master exec thread,split dag
@@ -99,100 +97,116 @@ public class WorkflowExecuteThread implements Runnable {
* logger of WorkflowExecuteThread
*/
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class);
+
/**
- * runing TaskNode
+ * master config
*/
- private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
+ private MasterConfig masterConfig;
/**
- * process instance
+ * process service
*/
- private ProcessInstance processInstance;
+ private ProcessService processService;
+
/**
- * submit failure nodes
+ * alert manager
*/
- private boolean taskFailedSubmit = false;
+ private ProcessAlertManager processAlertManager;
/**
- * recover node id list
+ * netty executor manager
*/
- private List<TaskInstance> recoverNodeIdList = new ArrayList<>();
+ private NettyExecutorManager nettyExecutorManager;
/**
- * error task list
+ * process instance
*/
- private Map<String, TaskInstance> errorTaskList = new ConcurrentHashMap<>();
+ private ProcessInstance processInstance;
/**
- * complete task list
+ * process definition
*/
- private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
+ private ProcessDefinition processDefinition;
/**
- * ready to submit task queue
+ * the object of DAG
*/
- private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
+ private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
- * depend failed task map
+ * key of workflow
*/
- private Map<String, TaskInstance> dependFailedTask = new ConcurrentHashMap<>();
+ private String key;
/**
- * forbidden task map
+ * start flag, true: start nodes submit completely
*/
- private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
+ private boolean isStart = false;
/**
- * skip task map
+ * submit failure nodes
*/
- private Map<String, TaskNode> skipTaskNodeList = new ConcurrentHashMap<>();
+ private boolean taskFailedSubmit = false;
/**
- * recover tolerance fault task list
+ * task instance hash map, taskId as key
*/
- private List<TaskInstance> recoverToleranceFaultTaskList = new ArrayList<>();
+ private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
- * alert manager
+ * running TaskNode, taskId as key
*/
- private ProcessAlertManager processAlertManager;
+ private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
/**
- * the object of DAG
+ * valid task map, taskCode as key, taskId as value
*/
- private DAG<String, TaskNode, TaskNodeRelation> dag;
+ private Map<String, Integer> validTaskMap = new ConcurrentHashMap<>();
/**
- * process service
+ * error task map, taskCode as key, taskId as value
*/
- private ProcessService processService;
+ private Map<String, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
- * master config
+ * complete task map, taskCode as key, taskId as value
*/
- private MasterConfig masterConfig;
+ private Map<String, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
- *
+ * depend failed task map, taskCode as key, taskId as value
*/
- private NettyExecutorManager nettyExecutorManager;
+ private Map<String, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
- private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
+ /**
+ * forbidden task map, code as key
+ */
+ private Map<String, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
- private List<Date> complementListDate = Lists.newLinkedList();
+ /**
+ * skip task map, code as key
+ */
+ private Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
- private Table<Integer, Long, TaskInstance> taskInstanceHashMap = HashBasedTable.create();
- private ProcessDefinition processDefinition;
- private String key;
+ /**
+ * complement date list
+ */
+ private List<Date> complementListDate = Lists.newLinkedList();
+ /**
+ * task timeout check list
+ */
private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
/**
- * start flag, true: start nodes submit completely
- *
+ * state event queue
*/
- private boolean isStart = false;
+ private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
+
+ /**
+ * ready to submit task queue
+ */
+ private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* constructor of WorkflowExecuteThread
@@ -200,7 +214,6 @@ public class WorkflowExecuteThread implements Runnable {
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
- * @param taskTimeoutCheckList
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
@@ -232,7 +245,6 @@ public class WorkflowExecuteThread implements Runnable {
/**
* the process start nodes are submitted completely.
- * @return
*/
public boolean isStart() {
return this.isStart;
@@ -286,9 +298,10 @@ public class WorkflowExecuteThread implements Runnable {
private boolean stateEventHandler(StateEvent stateEvent) {
logger.info("process event: {}", stateEvent.toString());
- if (!checkStateEvent(stateEvent)) {
+ if (!checkProcessInstance(stateEvent)) {
return false;
}
+
boolean result = false;
switch (stateEvent.getType()) {
case PROCESS_STATE_CHANGE:
@@ -314,16 +327,11 @@ public class WorkflowExecuteThread implements Runnable {
}
private boolean taskTimeout(StateEvent stateEvent) {
-
- if (taskInstanceHashMap.containsRow(stateEvent.getTaskInstanceId())) {
+ if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
- TaskInstance taskInstance = taskInstanceHashMap
- .row(stateEvent.getTaskInstanceId())
- .values()
- .iterator().next();
-
+ TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId());
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}
@@ -344,7 +352,16 @@ public class WorkflowExecuteThread implements Runnable {
}
private boolean taskStateChangeHandler(StateEvent stateEvent) {
- TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
+ if (!checkTaskInstanceByStateEvent(stateEvent)) {
+ return true;
+ }
+
+ TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
+ if (task.getState() == null) {
+ logger.error("task state is null, state handler error: {}", stateEvent);
+ return true;
+ }
+
if (task.getState().typeIsFinished()) {
taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
@@ -356,7 +373,7 @@ public class WorkflowExecuteThread implements Runnable {
taskFinished(task);
}
} else {
- logger.error("state handler error: {}", stateEvent.toString());
+ logger.error("state handler error: {}", stateEvent);
}
return true;
}
@@ -382,10 +399,11 @@ public class WorkflowExecuteThread implements Runnable {
}
return;
}
- ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
- completeTaskList.put(Long.toString(task.getTaskCode()), task);
+
+ completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
+
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
@@ -395,7 +413,7 @@ public class WorkflowExecuteThread implements Runnable {
|| DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
submitPostNode(Long.toString(task.getTaskCode()));
} else {
- errorTaskList.put(Long.toString(task.getTaskCode()), task);
+ errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@@ -404,20 +422,102 @@ public class WorkflowExecuteThread implements Runnable {
this.updateProcessInstanceState();
}
- private boolean checkStateEvent(StateEvent stateEvent) {
+ /**
+ * update process instance
+ */
+ public void refreshProcessInstance(int processInstanceId) {
+ logger.info("process instance update: {}", processInstanceId);
+ processInstance = processService.findProcessInstanceById(processInstanceId);
+ processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
+ processInstance.setProcessDefinition(processDefinition);
+ }
+
+ /**
+ * update task instance
+ */
+ public void refreshTaskInstance(int taskInstanceId) {
+ logger.info("task instance update: {} ", taskInstanceId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
+ if (taskInstance == null) {
+ logger.error("can not find task instance, id:{}", taskInstanceId);
+ return;
+ }
+ processService.packageTaskInstance(taskInstance, processInstance);
+ taskInstanceMap.put(taskInstance.getId(), taskInstance);
+
+ validTaskMap.remove(Long.toString(taskInstance.getTaskCode()));
+ if (Flag.YES == taskInstance.getFlag()) {
+ validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
+ }
+ }
+
+ /**
+ * check process instance by state event
+ */
+ public boolean checkProcessInstance(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}",
this.processInstance.getId(),
- stateEvent.toString());
+ stateEvent);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * check if task instance exist by state event
+ */
+ public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) {
+ if (stateEvent.getTaskInstanceId() == 0) {
+ logger.error("task instance id null, state event:{}", stateEvent);
+ return false;
+ }
+ if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
+ logger.error("mismatch task instance id, event:{}", stateEvent);
return false;
}
return true;
}
+ /**
+ * check if task instance exist by task code
+ */
+ public boolean checkTaskInstanceByCode(long taskCode) {
+ if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+ return false;
+ }
+ for (TaskInstance taskInstance : taskInstanceMap.values()) {
+ if (taskInstance.getTaskCode() == taskCode) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * check if task instance exist by id
+ */
+ public boolean checkTaskInstanceById(int taskInstanceId) {
+ if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+ return false;
+ }
+ return taskInstanceMap.containsKey(taskInstanceId);
+ }
+
+ /**
+ * get task instance from memory
+ */
+ public TaskInstance getTaskInstance(int taskInstanceId) {
+ if (taskInstanceMap.containsKey(taskInstanceId)) {
+ return taskInstanceMap.get(taskInstanceId);
+ }
+ return null;
+ }
+
private boolean processStateChangeHandler(StateEvent stateEvent) {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
- processInstance = processService.findProcessInstanceById(this.processInstance.getId());
if (processComplementData()) {
return true;
}
@@ -477,7 +577,7 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.setStartTime(new Date());
processInstance.setEndTime(null);
processService.saveProcessInstance(processInstance);
- this.taskInstanceHashMap.clear();
+ this.taskInstanceMap.clear();
startProcess();
return true;
}
@@ -491,7 +591,7 @@ public class WorkflowExecuteThread implements Runnable {
}
private void startProcess() throws Exception {
- if (this.taskInstanceHashMap.size() == 0) {
+ if (this.taskInstanceMap.size() == 0) {
isStart = false;
buildFlowDag();
initTaskQueue();
@@ -505,25 +605,22 @@ public class WorkflowExecuteThread implements Runnable {
*/
private void endProcess() {
this.stateEvents.clear();
- processInstance.setEndTime(new Date());
- ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
}
- processService.updateProcessInstance(processInstance);
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance);
}
- List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
- ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
+ if (processAlertManager.isNeedToSendWarning(processInstance)) {
+ ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+ processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
+ }
}
public void checkSerialProcess(ProcessDefinition processDefinition) {
- this.processInstance = processService.findProcessInstanceById(processInstance.getId());
int nextInstanceId = processInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) {
- ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode());
+ ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode());
if (nextProcessInstance == null) {
return;
}
@@ -553,19 +650,21 @@ public class WorkflowExecuteThread implements Runnable {
}
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
- recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
+ processInstance.setProcessDefinition(processDefinition);
+
+ List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList =
- processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
- forbiddenTaskList.clear();
+ processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
+ forbiddenTaskMap.clear();
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
- forbiddenTaskList.put(Long.toString(taskNode.getCode()), taskNode);
+ forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode);
}
});
// generate process to get DAG info
- List<String> recoveryNodeCodeList = getRecoveryNodeCodeList();
+ List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
@@ -584,19 +683,23 @@ public class WorkflowExecuteThread implements Runnable {
taskFailedSubmit = false;
activeTaskProcessorMaps.clear();
- dependFailedTask.clear();
- completeTaskList.clear();
- errorTaskList.clear();
- List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance task : taskInstanceList) {
+ dependFailedTaskMap.clear();
+ completeTaskMap.clear();
+ errorTaskMap.clear();
+
+ List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+ for (TaskInstance task : validTaskInstanceList) {
+ validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ taskInstanceMap.put(task.getId(), task);
+
if (task.isTaskComplete()) {
- completeTaskList.put(Long.toString(task.getTaskCode()), task);
+ completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
- errorTaskList.put(Long.toString(task.getTaskCode()), task);
+ errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
}
@@ -637,31 +740,32 @@ public class WorkflowExecuteThread implements Runnable {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
+
+ // package task instance before submit
+ processService.packageTaskInstance(taskInstance, processInstance);
+
boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval());
- if (submit) {
- this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance);
- activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
- taskProcessor.run();
- addTimeoutCheck(taskInstance);
- TaskDefinition taskDefinition = processService.findTaskDefinition(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
- taskInstance.setTaskDefine(taskDefinition);
- if (taskProcessor.taskState().typeIsFinished()) {
- StateEvent stateEvent = new StateEvent();
- stateEvent.setProcessInstanceId(this.processInstance.getId());
- stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setExecutionStatus(taskProcessor.taskState());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- this.stateEvents.add(stateEvent);
- }
- return taskInstance;
- } else {
+ if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return null;
}
+ taskInstanceMap.put(taskInstance.getId(), taskInstance);
+ activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
+ taskProcessor.run();
+
+ addTimeoutCheck(taskInstance);
+
+ if (taskProcessor.taskState().typeIsFinished()) {
+ StateEvent stateEvent = new StateEvent();
+ stateEvent.setProcessInstanceId(this.processInstance.getId());
+ stateEvent.setTaskInstanceId(taskInstance.getId());
+ stateEvent.setExecutionStatus(taskProcessor.taskState());
+ stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+ this.stateEvents.add(stateEvent);
+ }
+ return taskInstance;
} catch (Exception e) {
logger.error("submit standby task error", e);
return null;
@@ -688,11 +792,12 @@ public class WorkflowExecuteThread implements Runnable {
if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
return;
}
- TaskDefinition taskDefinition = processService.findTaskDefinition(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion()
- );
- taskInstance.setTaskDefine(taskDefinition);
+ TaskDefinition taskDefinition = taskInstance.getTaskDefine();
+ if (taskDefinition == null) {
+ logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+ return;
+ }
+
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
} else {
@@ -711,8 +816,8 @@ public class WorkflowExecuteThread implements Runnable {
* @return TaskInstance
*/
private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
- List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId());
- for (TaskInstance taskInstance : taskInstanceList) {
+ List<TaskInstance> validTaskInstanceList = getValidTaskList();
+ for (TaskInstance taskInstance : validTaskInstanceList) {
if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == taskVersion) {
return taskInstance;
}
@@ -805,7 +910,11 @@ public class WorkflowExecuteThread implements Runnable {
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskCode : preTask) {
- TaskInstance preTaskInstance = completeTaskList.get(preTaskCode);
+ Integer taskId = completeTaskMap.get(preTaskCode);
+ if (taskId == null) {
+ continue;
+ }
+ TaskInstance preTaskInstance = taskInstanceMap.get(taskId);
if (preTaskInstance == null) {
continue;
}
@@ -854,12 +963,35 @@ public class WorkflowExecuteThread implements Runnable {
}
}
+ /**
+ * get complete task instance map, taskCode as key
+ */
+ private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
+ Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
+ for (Integer taskInstanceId : completeTaskMap.values()) {
+ TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
+ completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
+ }
+ return completeTaskInstanceMap;
+ }
+
+ /**
+ * get valid task list
+ */
+ private List<TaskInstance> getValidTaskList() {
+ List<TaskInstance> validTaskInstanceList = new ArrayList<>();
+ for (Integer taskInstanceId : validTaskMap.values()) {
+ validTaskInstanceList.add(taskInstanceMap.get(taskInstanceId));
+ }
+ return validTaskInstanceList;
+ }
+
private void submitPostNode(String parentNodeCode) {
- Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeList, dag, completeTaskList);
+ Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
- if (taskInstanceHashMap.containsColumn(taskNodeObject.getCode())) {
+ if (checkTaskInstanceByCode(taskNodeObject.getCode())) {
continue;
}
TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
@@ -873,15 +1005,16 @@ public class WorkflowExecuteThread implements Runnable {
continue;
}
- if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
+ if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
logger.info("task {} has already run success", task.getName());
continue;
}
if (task.getState().typeIsPause() || task.getState().typeIsCancel()) {
logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
- } else {
- addTaskToStandByList(task);
+ continue;
}
+
+ addTaskToStandByList(task);
}
submitStandByTask();
updateProcessInstanceState();
@@ -903,15 +1036,16 @@ public class WorkflowExecuteThread implements Runnable {
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
if (!dag.containsNode(depsNode)
- || forbiddenTaskList.containsKey(depsNode)
- || skipTaskNodeList.containsKey(depsNode)) {
+ || forbiddenTaskMap.containsKey(depsNode)
+ || skipTaskNodeMap.containsKey(depsNode)) {
continue;
}
// dependencies must be fully completed
- if (!completeTaskList.containsKey(depsNode)) {
+ if (!completeTaskMap.containsKey(depsNode)) {
return DependResult.WAITING;
}
- ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
+ Integer depsTaskId = completeTaskMap.get(depsNode);
+ ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
}
@@ -923,7 +1057,7 @@ public class WorkflowExecuteThread implements Runnable {
return DependResult.FAILED;
}
}
- logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray()));
+ logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
return DependResult.SUCCESS;
}
@@ -933,12 +1067,13 @@ public class WorkflowExecuteThread implements Runnable {
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName) {
if (dag.getNode(dependNodeName).isConditionsTask()) {
//condition task need check the branch to run
- List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList);
+ List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
return false;
}
} else {
- ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
+ Integer taskInstanceId = completeTaskMap.get(dependNodeName);
+ ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
if (depTaskState.typeIsFailure()) {
return false;
}
@@ -954,9 +1089,10 @@ public class WorkflowExecuteThread implements Runnable {
*/
private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state) {
List<TaskInstance> resultList = new ArrayList<>();
- for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
- if (entry.getValue().getState() == state) {
- resultList.add(entry.getValue());
+ for (Integer taskInstanceId : completeTaskMap.values()) {
+ TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
+ if (taskInstance != null && taskInstance.getState() == state) {
+ resultList.add(taskInstance);
}
}
return resultList;
@@ -990,10 +1126,10 @@ public class WorkflowExecuteThread implements Runnable {
if (this.taskFailedSubmit) {
return true;
}
- if (this.errorTaskList.size() > 0) {
+ if (this.errorTaskMap.size() > 0) {
return true;
}
- return this.dependFailedTask.size() > 0;
+ return this.dependFailedTaskMap.size() > 0;
}
/**
@@ -1049,7 +1185,6 @@ public class WorkflowExecuteThread implements Runnable {
/**
* generate the latest process instance status by the tasks state
*
- * @param instance
* @return process instance execution status
*/
private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {
@@ -1130,8 +1265,7 @@ public class WorkflowExecuteThread implements Runnable {
* after each batch of tasks is executed, the status of the process instance is updated
*/
private void updateProcessInstanceState() {
- ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
- ExecutionStatus state = getProcessInstanceState(instance);
+ ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
@@ -1139,9 +1273,12 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getState(), state,
processInstance.getCommandType());
- instance.setState(state);
- processService.updateProcessInstance(instance);
- processInstance = instance;
+ processInstance.setState(state);
+ if (state.typeIsFinished()) {
+ processInstance.setEndTime(new Date());
+ }
+ processService.updateProcessInstance(processInstance);
+
StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId());
@@ -1254,7 +1391,8 @@ public class WorkflowExecuteThread implements Runnable {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
- completeTaskList.put(Long.toString(task.getTaskCode()), task);
+ completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
@@ -1281,7 +1419,7 @@ public class WorkflowExecuteThread implements Runnable {
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
- dependFailedTask.put(Long.toString(task.getTaskCode()), task);
+ dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
@@ -1366,10 +1504,10 @@ public class WorkflowExecuteThread implements Runnable {
*
* @return recovery node code list
*/
- private List<String> getRecoveryNodeCodeList() {
+ private List<String> getRecoveryNodeCodeList(List<TaskInstance> recoverNodeList) {
List<String> recoveryNodeCodeList = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
- for (TaskInstance task : recoverNodeIdList) {
+ if (CollectionUtils.isNotEmpty(recoverNodeList)) {
+ for (TaskInstance task : recoverNodeList) {
recoveryNodeCodeList.add(Long.toString(task.getTaskCode()));
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index fb14d96..7194ff9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -161,8 +161,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
- processService.setTaskInstanceDetail(taskInstance);
-
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
@@ -172,12 +170,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
- null,
- taskInstance.getId());
+ null
+ );
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
- String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+ String userQueue = processService.queryUserQueueByProcessInstance(taskInstance.getProcessInstance());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setResources(getResourceFullNames(taskInstance));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ee1c548..7b193bf 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -65,7 +65,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
- this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index ee1cf82..ee03602 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -72,7 +72,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
- this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 8c3a287..d873a81 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
@@ -81,7 +80,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = task;
- this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index e0cd3e8..421efd3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -54,7 +54,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskDefinition = processService.findTaskDefinition(
task.getTaskCode(), task.getTaskDefinitionVersion()
);
- this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 116a8d5..70013e1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -63,7 +63,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
- this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+ this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index c8a4ae4..4207d30 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -89,7 +89,7 @@ public class ConditionsTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
- .submitTask(taskInstance))
+ .submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 1fa37cc..74017f0 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -93,6 +93,7 @@ public class DependentTaskTest {
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstance = getProcessInstance();
+ taskInstance = getTaskInstance();
// for MasterBaseTaskExecThread.call
// for DependentTaskExecThread.waitTaskQuit
@@ -102,7 +103,7 @@ public class DependentTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
- .submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000)))
+ .submitTask(processInstance, taskInstance))
.thenAnswer(i -> taskInstance);
// for DependentTaskExecThread.initTaskParameters
@@ -346,6 +347,12 @@ public class DependentTaskTest {
return processInstance;
}
+ private TaskInstance getTaskInstance() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1000);
+ return taskInstance;
+ }
+
/**
* task that dependent on others (and to be tested here)
* notice: should be filled with setDependence() and be passed to setupTaskInstance()
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 5fcfc77..8af6190 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -74,6 +74,8 @@ public class SubProcessTaskTest {
Mockito.when(applicationContext.getBean(AlertDao.class)).thenReturn(alertDao);
processInstance = getProcessInstance();
+ TaskInstance taskInstance = getTaskInstance();
+
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
.thenReturn(processInstance);
@@ -85,7 +87,7 @@ public class SubProcessTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
- .submitTask(Mockito.any()))
+ .submitTask(processInstance, taskInstance))
.thenAnswer(t -> t.getArgument(0));
TaskDefinition taskDefinition = new TaskDefinition();
@@ -147,6 +149,12 @@ public class SubProcessTaskTest {
return processInstance;
}
+ private TaskInstance getTaskInstance() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1000);
+ return taskInstance;
+ }
+
private ProcessInstance getSubProcessInstance(ExecutionStatus executionStatus) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(102);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
index 3221edb..4516d9d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -81,7 +81,7 @@ public class SwitchTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
- .submitTask(taskInstance))
+ .submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 911d0d7..87bc042 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -162,39 +162,52 @@ public class WorkflowExecuteThreadTest {
public void testGetPreVarPool() {
try {
Set<String> preTaskName = new HashSet<>();
- preTaskName.add("test1");
- preTaskName.add("test2");
- Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
+ preTaskName.add(Long.toString(1));
+ preTaskName.add(Long.toString(2));
TaskInstance taskInstance = new TaskInstance();
TaskInstance taskInstance1 = new TaskInstance();
taskInstance1.setId(1);
- taskInstance1.setName("test1");
+ taskInstance1.setTaskCode(1);
taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]");
taskInstance1.setEndTime(new Date());
TaskInstance taskInstance2 = new TaskInstance();
taskInstance2.setId(2);
- taskInstance2.setName("test2");
+ taskInstance2.setTaskCode(2);
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
taskInstance2.setEndTime(new Date());
- completeTaskList.put("test1", taskInstance1);
- completeTaskList.put("test2", taskInstance2);
+ Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
+ taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
+ taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
+
+ Map<String, Integer> completeTaskList = new ConcurrentHashMap<>();
+ completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId());
+ completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId());
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
- Field field = masterExecThreadClass.getDeclaredField("completeTaskList");
- field.setAccessible(true);
- field.set(workflowExecuteThread, completeTaskList);
+ Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
+ completeTaskMapField.setAccessible(true);
+ completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+
+ Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap");
+ taskInstanceMapField.setAccessible(true);
+ taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
Assert.assertNotNull(taskInstance.getVarPool());
+
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
- completeTaskList.put("test2", taskInstance2);
- field.setAccessible(true);
- field.set(workflowExecuteThread, completeTaskList);
+ completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId());
+
+ completeTaskMapField.setAccessible(true);
+ completeTaskMapField.set(workflowExecuteThread, completeTaskList);
+ taskInstanceMapField.setAccessible(true);
+ taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
+
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
Assert.assertNotNull(taskInstance.getVarPool());
} catch (Exception e) {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
deleted file mode 100644
index f609845..0000000
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.cache.impl;
-
-import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
-
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class TaskInstanceCacheManagerImplTest {
-
- @InjectMocks
- private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
-
- @Mock(name = "processService")
- private ProcessService processService;
-
- @Before
- public void before() {
-
- TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
- taskExecuteAckCommand.setStatus(1);
- taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
- taskExecuteAckCommand.setHost("worker007");
- taskExecuteAckCommand.setLogPath("/temp/worker.log");
- taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7));
- taskExecuteAckCommand.setTaskInstanceId(0);
-
- taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand);
-
- }
-
- @Test
- public void testInit() throws InterruptedException {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(0);
- taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- taskInstance.setExecutePath("/dolphinscheduler/worker");
- taskInstance.setHost("worker007");
- taskInstance.setLogPath("/temp/worker.log");
- taskInstance.setProcessInstanceId(0);
-
- Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance);
-
- taskInstanceCacheManager.init();
- TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000);
-
- Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState());
-
- }
-
- @Test
- public void getByTaskInstanceIdFromCache() {
- TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0);
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(0);
- taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- taskInstance.setExecutePath("/dolphinscheduler/worker");
- taskInstance.setHost("worker007");
- taskInstance.setLogPath("/temp/worker.log");
- taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
-
- Assert.assertEquals(taskInstance.toString(), instanceGot.toString());
-
- }
-
- @Test
- public void getByTaskInstanceIdFromDatabase() {
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- taskInstance.setExecutePath("/dolphinscheduler/worker");
- taskInstance.setHost("worker007");
- taskInstance.setLogPath("/temp/worker.log");
- taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
-
- Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
-
- TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1);
-
- Assert.assertEquals(taskInstance, instanceGot);
-
- }
-
- @Test
- public void cacheTaskInstanceByTaskExecutionContext() {
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
- taskExecutionContext.setTaskInstanceId(2);
- taskExecutionContext.setTaskName("blackberrier test");
- taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7));
- taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
- taskExecutionContext.setExecutePath("/tmp");
-
- taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext);
-
- TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2);
-
- Assert.assertEquals(taskInstance.getId(), 2);
- Assert.assertEquals(taskInstance.getName(), "blackberrier test");
- Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7));
- Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc());
- Assert.assertEquals(taskInstance.getExecutePath(), "/tmp");
-
- }
-
- @Test
- public void testCacheTaskInstanceByTaskExecuteAckCommand() {
- TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
-
- Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState());
- Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime());
- Assert.assertEquals("worker007", taskInstance.getHost());
- Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath());
- Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath());
-
- }
-
- @Test
- public void testCacheTaskInstanceByTaskExecuteResponseCommand() {
- TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
- responseCommand.setTaskInstanceId(0);
- responseCommand.setStatus(9);
- responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8));
-
- taskInstanceCacheManager.cacheTaskInstance(responseCommand);
-
- TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
-
- Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime());
- Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState());
-
- }
-
- @Test
- public void removeByTaskInstanceId() {
- taskInstanceCacheManager.removeByTaskInstanceId(0);
- Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0));
-
- }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index f9d51a9..99b6f79 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -76,8 +76,6 @@ public class TaskPriorityQueueConsumerTest {
tenant.setUpdateTime(new Date());
Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2);
-
- Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
}
@Test
@@ -101,7 +99,6 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@@ -129,7 +126,6 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@@ -171,7 +167,6 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@@ -211,7 +206,6 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@@ -271,7 +265,6 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
@@ -310,7 +303,6 @@ public class TaskPriorityQueueConsumerTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority();
@@ -342,7 +334,6 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
index e215d4c..823ffa2 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -18,20 +18,16 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.net.InetSocketAddress;
import java.util.Date;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -39,7 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
- * task ack processor test
+ * task ack processor test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class})
@@ -47,7 +43,6 @@ public class TaskAckProcessorTest {
private TaskAckProcessor taskAckProcessor;
private TaskResponseService taskResponseService;
- private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
private ProcessService processService;
private TaskExecuteAckCommand taskExecuteAckCommand;
private TaskResponseEvent taskResponseEvent;
@@ -60,9 +55,6 @@ public class TaskAckProcessorTest {
taskResponseService = PowerMockito.mock(TaskResponseService.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
- taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class);
- PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager);
-
processService = PowerMockito.mock(ProcessService.class);
PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
index e7afa14..55828e1 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
@@ -86,7 +86,6 @@ public class CommonTaskProcessorTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
- Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
index 4429e7c..8d1faa8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
@@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
@@ -68,11 +67,6 @@ public class DependencyConfig {
}
@Bean
- public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
- return Mockito.mock(TaskInstanceCacheManagerImpl.class);
- }
-
- @Bean
public ProcessService processService() {
return Mockito.mock(ProcessService.class);
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
index f4876a6..0ac2372 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
@@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito;
@@ -60,11 +59,6 @@ public class TaskCallbackServiceTestConfig {
}
@Bean
- public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
- return Mockito.mock(TaskInstanceCacheManagerImpl.class);
- }
-
- @Bean
public ProcessService processService() {
return Mockito.mock(ProcessService.class);
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index c2db565..669bd4e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -205,43 +205,54 @@ public class ProcessAlertManager {
List<TaskInstance> taskInstances,
ProjectUser projectUser) {
- if (Flag.YES == processInstance.getIsSubProcess()) {
+ if (!isNeedToSendWarning(processInstance)) {
return;
}
- boolean sendWarnning = false;
+
+ Alert alert = new Alert();
+
+ String cmdName = getCommandCnName(processInstance.getCommandType());
+ String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
+ alert.setTitle(cmdName + " " + success);
+ String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
+ alert.setContent(content);
+ alert.setAlertGroupId(processInstance.getWarningGroupId());
+ alert.setCreateTime(new Date());
+ alertDao.addAlert(alert);
+ logger.info("add alert to db , alert: {}", alert);
+ }
+
+ /**
+ * check if need to be send warning
+ *
+ * @param processInstance
+ * @return
+ */
+ public boolean isNeedToSendWarning(ProcessInstance processInstance) {
+ if (Flag.YES == processInstance.getIsSubProcess()) {
+ return false;
+ }
+ boolean sendWarning = false;
WarningType warningType = processInstance.getWarningType();
switch (warningType) {
case ALL:
if (processInstance.getState().typeIsFinished()) {
- sendWarnning = true;
+ sendWarning = true;
}
break;
case SUCCESS:
if (processInstance.getState().typeIsSuccess()) {
- sendWarnning = true;
+ sendWarning = true;
}
break;
case FAILURE:
if (processInstance.getState().typeIsFailure()) {
- sendWarnning = true;
+ sendWarning = true;
}
break;
default:
}
- if (!sendWarnning) {
- return;
- }
- Alert alert = new Alert();
-
- String cmdName = getCommandCnName(processInstance.getCommandType());
- String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
- alert.setTitle(cmdName + " " + success);
- String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
- alert.setContent(content);
- alert.setAlertGroupId(processInstance.getWarningGroupId());
- alert.setCreateTime(new Date());
- alertDao.addAlert(alert);
- logger.info("add alert to db , alert: {}", alert);
+ return sendWarning;
}
/**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bf0706c..fbf29b3 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -100,6 +100,7 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -1087,24 +1088,17 @@ public class ProcessService {
/**
* retry submit task to db
*/
- public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
-
+ public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
int retryTimes = 1;
- boolean submitDB = false;
TaskInstance task = null;
while (retryTimes <= commitRetryTimes) {
try {
- if (!submitDB) {
- // submit task to db
- task = submitTask(taskInstance);
- if (task != null && task.getId() != 0) {
- submitDB = true;
- break;
- }
- }
- if (!submitDB) {
- logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
+ // submit task to db
+ task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
+ if (task != null && task.getId() != 0) {
+ break;
}
+ logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to mysql failed", e);
@@ -1118,12 +1112,12 @@ public class ProcessService {
* submit task to db
* submit sub process to command
*
+ * @param processInstance processInstance
* @param taskInstance taskInstance
* @return task instance
*/
@Transactional(rollbackFor = Exception.class)
- public TaskInstance submitTask(TaskInstance taskInstance) {
- ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+ public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("start submit task : {}, instance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
@@ -1131,8 +1125,9 @@ public class ProcessService {
if (task == null) {
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
- return task;
+ return null;
}
+
if (!task.getState().typeIsFinished()) {
createSubWorkProcess(processInstance, task);
}
@@ -1383,7 +1378,7 @@ public class ProcessService {
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
- taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
+ taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
}
@@ -1406,10 +1401,10 @@ public class ProcessService {
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
- * @param processInstanceState processInstanceState
+ * @param processInstance processInstance
* @return process instance state
*/
- public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
+ public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus state = taskInstance.getState();
// running, delayed or killed
// the task already exists in task queue
@@ -1423,10 +1418,10 @@ public class ProcessService {
}
//return pasue /stop if process instance state is ready pause / stop
// or return submit success
- if (processInstanceState == ExecutionStatus.READY_PAUSE) {
+ if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
- } else if (processInstanceState == ExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance)) {
+ } else if (processInstance.getState() == ExecutionStatus.READY_STOP
+ || !checkProcessStrategy(taskInstance, processInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1440,8 +1435,7 @@ public class ProcessService {
* @param taskInstance taskInstance
* @return check strategy result
*/
- private boolean checkProcessStrategy(TaskInstance taskInstance) {
- ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
+ private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
FailureStrategy failureStrategy = processInstance.getFailureStrategy();
if (failureStrategy == FailureStrategy.CONTINUE) {
return true;
@@ -1535,39 +1529,15 @@ public class ProcessService {
}
/**
- * package task instanceļ¼associate processInstance and processDefine
- *
- * @param taskInstId taskInstId
- * @return task instance
+ * package task instance
*/
- public TaskInstance getTaskInstanceDetailByTaskId(int taskInstId) {
- // get task instance
- TaskInstance taskInstance = findTaskInstanceById(taskInstId);
- if (taskInstance == null) {
- return null;
- }
- setTaskInstanceDetail(taskInstance);
- return taskInstance;
- }
-
- /**
- * package task instanceļ¼associate processInstance and processDefine
- *
- * @param taskInstance taskInstance
- * @return task instance
- */
- public void setTaskInstanceDetail(TaskInstance taskInstance) {
- // get process instance
- ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- // get process define
- ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
taskInstance.setProcessInstance(processInstance);
- taskInstance.setProcessDefine(processDefine);
- TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+ taskInstance.setProcessDefine(processInstance.getProcessDefinition());
+ TaskDefinition taskDefinition = this.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
- updateTaskDefinitionResources(taskDefinition);
+ this.updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
}
@@ -1576,7 +1546,7 @@ public class ProcessService {
*
* @param taskDefinition the given {@link TaskDefinition}
*/
- private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
+ public void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskDefinition.getTaskParams(),
new TypeReference<Map<String, Object>>() {
@@ -1757,12 +1727,10 @@ public class ProcessService {
* @param host host
* @param executePath executePath
* @param logPath logPath
- * @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath,
- String logPath,
- int taskInstId) {
+ String logPath) {
taskInstance.setState(state);
taskInstance.setStartTime(startTime);
taskInstance.setHost(host);
@@ -1786,14 +1754,12 @@ public class ProcessService {
*
* @param state state
* @param endTime endTime
- * @param taskInstId taskInstId
* @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
int processId,
String appIds,
- int taskInstId,
String varPool) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
@@ -2047,15 +2013,14 @@ public class ProcessService {
}
/**
- * query user queue by process instance id
+ * query user queue by process instance
*
- * @param processInstanceId processInstanceId
+ * @param processInstance processInstance
* @return queue
*/
- public String queryUserQueueByProcessInstanceId(int processInstanceId) {
+ public String queryUserQueueByProcessInstance(ProcessInstance processInstance) {
String queue = "";
- ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
if (processInstance == null) {
return queue;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index d78fb98..2cbd298 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.Objects;
/**
- * task priority info
+ * task priority info
*/
public class TaskPriority implements Comparable<TaskPriority> {
@@ -62,7 +62,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
*/
private Map<String, String> context;
- public TaskPriority(){}
+ /**
+ * checkpoint
+ */
+ private long checkpoint;
+
+ public TaskPriority() {
+ this.checkpoint = System.currentTimeMillis();
+ }
public TaskPriority(int processInstancePriority,
int processInstanceId,
@@ -73,6 +80,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId;
this.groupName = groupName;
+ this.checkpoint = System.currentTimeMillis();
}
public int getProcessInstancePriority() {
@@ -131,6 +139,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.taskExecutionContext = taskExecutionContext;
}
+ public long getCheckpoint() {
+ return checkpoint;
+ }
+
+ public void setCheckpoint(long checkpoint) {
+ this.checkpoint = checkpoint;
+ }
+
@Override
public int compareTo(TaskPriority other) {
if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
@@ -174,7 +190,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
}
TaskPriority that = (TaskPriority) o;
return processInstancePriority == that.processInstancePriority
- && processInstanceId == that.processInstanceId
+ && processInstanceId == that.processInstanceId
&& taskInstancePriority == that.taskInstancePriority
&& taskId == that.taskId
&& Objects.equals(groupName, that.groupName);