You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/05 09:40:28 UTC
[dolphinscheduler] branch 3.0.0-prepare updated: Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.0-prepare by this push:
new 230adbf621 Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)
230adbf621 is described below
commit 230adbf62196dd17cad1623e5b10bd6c5a14cf00
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Aug 5 17:40:22 2022 +0800
Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)
* Fix TaskGroup cannot work and will cause master dead loop
* Remove acquireTaskGroupAgain in ProcessServiceImpl
(cherry picked from commit acd3d3fab14d08b3b06015be3ac9cfb2f4bfc934)
---
.../dao/entity/TaskGroupQueue.java | 4 +-
.../dao/mapper/TaskGroupMapper.java | 3 +
.../dao/mapper/TaskGroupQueueMapper.java | 3 +
.../server/master/event/TaskStateEventHandler.java | 1 +
.../master/runner/WorkflowExecuteRunnable.java | 20 +++++-
.../master/runner/task/CommonTaskProcessor.java | 18 +-----
.../service/process/ProcessService.java | 4 +-
.../service/process/ProcessServiceImpl.java | 72 +++++++++++-----------
8 files changed, 65 insertions(+), 60 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
index 6399f12ada..b9959eb96e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
@@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
- * taskIntanceid
+ * taskInstanceId
*/
private int taskId;
/**
@@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable {
*/
private int groupId;
/**
- * processInstace id
+ * processInstance id
*/
private int processId;
/**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index d12ad9d0d2..bd7ffd6cba 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -72,6 +72,9 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
*/
TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);
+ /**
+ * Select the groupSize > useSize Count
+ */
int selectAvailableCountById(@Param("groupId") int groupId);
int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
index 5fda409432..38f5ca6016 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
@@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
*/
int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status);
+ /**
+ * Query the {@link TaskGroupQueue}, who's priority > the given <code>priority</code>
+ */
List<TaskGroupQueue> queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status);
TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
index e3ad268f97..dc4cc2721e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler {
}
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
+ logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
workflowExecuteRunnable.releaseTaskGroup(task);
}
return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 005a41d26e..09e5318ca5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -332,7 +332,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return true;
}
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
- boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
+ boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
@@ -409,6 +409,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @param taskInstance
*/
public void releaseTaskGroup(TaskInstance taskInstance) {
+ logger.info("Release task group");
if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) {
@@ -922,6 +923,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
+
+ // if we use task group, then need to acquire the task group resource
+ // if there is no resource the current task instance will not be dispatched
+ // it will be weakup when other tasks release the resource.
+ int taskGroupId = taskInstance.getTaskGroupId();
+ if (taskGroupId > 0) {
+ boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
+ taskInstance.getName(),
+ taskGroupId,
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getTaskGroupPriority());
+ if (!acquireTaskGroup) {
+ logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
+ return Optional.of(taskInstance);
+ }
+ }
+
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 0dec26defe..6e7ab96d66 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
protected boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
- if (this.taskInstance == null) {
- return false;
- }
-
- int taskGroupId = taskInstance.getTaskGroupId();
- if (taskGroupId > 0) {
- boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
- taskInstance.getName(),
- taskGroupId,
- taskInstance.getProcessInstanceId(),
- taskInstance.getTaskGroupPriority());
- if (!acquireTaskGroup) {
- logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
- return true;
- }
- }
- return true;
+ return this.taskInstance != null;
}
@Override
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 47e4732a4f..0c1db7f6e9 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -271,9 +271,7 @@ public interface ProcessService {
String taskName, int groupId,
int processId, int priority);
- boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue);
-
- boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue);
+ boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue);
void releaseAllTaskGroup(int processInstanceId);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 2e1f63fbcb..d91f630633 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process;
-import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import static java.util.stream.Collectors.toSet;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -350,7 +351,7 @@ public class ProcessServiceImpl implements ProcessService {
/**
* Save error command, and delete original command. If the given command has already been moved into error command,
- * will throw {@link SQLIntegrityConstraintViolationException ).
+ * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
*
* @param command command
* @param message message
@@ -2871,21 +2872,22 @@ public class ProcessServiceImpl implements ProcessService {
* @param taskId task id
*/
@Override
- public boolean acquireTaskGroup(int taskId,
- String taskName, int groupId,
- int processId, int priority) {
+ public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) {
TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
if (taskGroup == null) {
+ // we don't throw exception here, to avoid the task group has been deleted during workflow running
return true;
}
// if task group is not applicable
if (taskGroup.getStatus() == Flag.NO.getCode()) {
return true;
}
+ // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
} else {
+ logger.info("The task queue is already exist, taskId: {}", taskId);
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true;
}
@@ -2893,15 +2895,14 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
- //check priority
+ //check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
- this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
//try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId);
- if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
+ if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
return true;
}
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
@@ -2912,10 +2913,11 @@ public class ProcessServiceImpl implements ProcessService {
* try to get the task group resource(when other task release the resource)
*/
@Override
- public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
+ public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
- int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+ int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
+ taskGroupQueue.getId(),
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
@@ -2925,11 +2927,6 @@ public class ProcessServiceImpl implements ProcessService {
return false;
}
- @Override
- public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
- return robTaskGroupResouce(taskGroupQueue);
- }
-
@Override
public void releaseAllTaskGroup(int processInstanceId) {
List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
@@ -2946,40 +2943,41 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
- TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
- if (taskGroup == null) {
- return null;
- }
- TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
- if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
- return null;
- }
+ TaskGroup taskGroup;
+ TaskGroupQueue thisTaskGroupQueue;
try {
- while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
- , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
+ do {
+ taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
+ if (taskGroup == null) {
+ return null;
+ }
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null;
}
- taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
- }
+ } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
+ && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
+ taskGroup.getUseSize(),
+ thisTaskGroupQueue.getId(),
+ TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) {
logger.error("release the task group error", e);
+ return null;
}
logger.info("updateTask:{}", taskInstance.getName());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
- TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
- if (taskGroupQueue == null) {
- return null;
- }
- while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
+ TaskGroupQueue taskGroupQueue;
+ do {
taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
- TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
+ TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
+ Flag.NO.getCode(),
+ Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
- }
+ } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
+ Flag.YES.getCode(),
+ taskGroupQueue.getId()) != 1);
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}
@@ -3006,7 +3004,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param groupId group id
* @param processId process id
* @param priority priority
- * @return result and msg code
+ * @return inserted task group queue
*/
@Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,