You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/05 09:40:28 UTC

[dolphinscheduler] branch 3.0.0-prepare updated: Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.0-prepare by this push:
     new 230adbf621 Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)
230adbf621 is described below

commit 230adbf62196dd17cad1623e5b10bd6c5a14cf00
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Aug 5 17:40:22 2022 +0800

    Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)
    
    * Fix TaskGroup cannot work and will cause master dead loop
    
    * Remove acquireTaskGroupAgain in ProcessServiceImpl
    
    (cherry picked from commit acd3d3fab14d08b3b06015be3ac9cfb2f4bfc934)
---
 .../dao/entity/TaskGroupQueue.java                 |  4 +-
 .../dao/mapper/TaskGroupMapper.java                |  3 +
 .../dao/mapper/TaskGroupQueueMapper.java           |  3 +
 .../server/master/event/TaskStateEventHandler.java |  1 +
 .../master/runner/WorkflowExecuteRunnable.java     | 20 +++++-
 .../master/runner/task/CommonTaskProcessor.java    | 18 +-----
 .../service/process/ProcessService.java            |  4 +-
 .../service/process/ProcessServiceImpl.java        | 72 +++++++++++-----------
 8 files changed, 65 insertions(+), 60 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
index 6399f12ada..b9959eb96e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
@@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable {
     @TableId(value = "id", type = IdType.AUTO)
     private int id;
     /**
-     * taskIntanceid
+     * taskInstanceId
      */
     private int taskId;
     /**
@@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable {
      */
     private int groupId;
     /**
-     * processInstace id
+     * processInstance id
      */
     private int processId;
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
index d12ad9d0d2..bd7ffd6cba 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
@@ -72,6 +72,9 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
      */
     TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);
 
+    /**
+     * Select the groupSize > useSize Count
+     */
     int selectAvailableCountById(@Param("groupId") int groupId);
 
     int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
index 5fda409432..38f5ca6016 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
@@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
      */
     int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status);
 
+    /**
+     * Query the {@link TaskGroupQueue}, who's priority > the given <code>priority</code>
+     */
     List<TaskGroupQueue> queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status);
 
     TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
index e3ad268f97..dc4cc2721e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler {
             }
             workflowExecuteRunnable.taskFinished(task);
             if (task.getTaskGroupId() > 0) {
+                logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
                 workflowExecuteRunnable.releaseTaskGroup(task);
             }
             return true;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 005a41d26e..09e5318ca5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -332,7 +332,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             return true;
         }
         if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
-            boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
+            boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
             if (acquireTaskGroup) {
                 TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
                 ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
@@ -409,6 +409,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      * @param taskInstance
      */
     public void releaseTaskGroup(TaskInstance taskInstance) {
+        logger.info("Release task group");
         if (taskInstance.getTaskGroupId() > 0) {
             TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
             if (nextTaskInstance != null) {
@@ -922,6 +923,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
             taskInstanceMap.put(taskInstance.getId(), taskInstance);
             activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
+
+            // if we use task group, then need to acquire the task group resource
+            // if there is no resource the current task instance will not be dispatched
+            // it will be weakup when other tasks release the resource.
+            int taskGroupId = taskInstance.getTaskGroupId();
+            if (taskGroupId > 0) {
+                boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
+                                                                           taskInstance.getName(),
+                                                                           taskGroupId,
+                                                                           taskInstance.getProcessInstanceId(),
+                                                                           taskInstance.getTaskGroupPriority());
+                if (!acquireTaskGroup) {
+                    logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
+                    return Optional.of(taskInstance);
+                }
+            }
+
             boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
             if (!dispatchSuccess) {
                 logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 0dec26defe..6e7ab96d66 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     protected boolean submitTask() {
         this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
 
-        if (this.taskInstance == null) {
-            return false;
-        }
-
-        int taskGroupId = taskInstance.getTaskGroupId();
-        if (taskGroupId > 0) {
-            boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
-                    taskInstance.getName(),
-                    taskGroupId,
-                    taskInstance.getProcessInstanceId(),
-                    taskInstance.getTaskGroupPriority());
-            if (!acquireTaskGroup) {
-                logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
-                return true;
-            }
-        }
-        return true;
+        return this.taskInstance != null;
     }
 
     @Override
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 47e4732a4f..0c1db7f6e9 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -271,9 +271,7 @@ public interface ProcessService {
                              String taskName, int groupId,
                              int processId, int priority);
 
-    boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue);
-
-    boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue);
+    boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue);
 
     void releaseAllTaskGroup(int processInstanceId);
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 2e1f63fbcb..d91f630633 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import static java.util.stream.Collectors.toSet;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
 import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
 
+import static java.util.stream.Collectors.toSet;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -350,7 +351,7 @@ public class ProcessServiceImpl implements ProcessService {
 
     /**
      * Save error command, and delete original command. If the given command has already been moved into error command,
-     * will throw {@link SQLIntegrityConstraintViolationException ).
+     * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
      *
      * @param command command
      * @param message message
@@ -2871,21 +2872,22 @@ public class ProcessServiceImpl implements ProcessService {
      * @param taskId task id
      */
     @Override
-    public boolean acquireTaskGroup(int taskId,
-                                    String taskName, int groupId,
-                                    int processId, int priority) {
+    public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) {
         TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
         if (taskGroup == null) {
+            // we don't throw exception here, to avoid the task group has been deleted during workflow running
             return true;
         }
         // if task group is not applicable
         if (taskGroup.getStatus() == Flag.NO.getCode()) {
             return true;
         }
+        // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
         TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
         if (taskGroupQueue == null) {
             taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
         } else {
+            logger.info("The task queue is already exist, taskId: {}", taskId);
             if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
                 return true;
             }
@@ -2893,15 +2895,14 @@ public class ProcessServiceImpl implements ProcessService {
             taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
             this.taskGroupQueueMapper.updateById(taskGroupQueue);
         }
-        //check priority
+        //check if there already exist higher priority tasks
         List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
         if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
-            this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
             return false;
         }
         //try to get taskGroup
         int count = taskGroupMapper.selectAvailableCountById(groupId);
-        if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
+        if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
             return true;
         }
         this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
@@ -2912,10 +2913,11 @@ public class ProcessServiceImpl implements ProcessService {
      * try to get the task group resource(when other task release the resource)
      */
     @Override
-    public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
+    public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
         TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
-        int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(),
-            TaskGroupQueueStatus.WAIT_QUEUE.getCode());
+        int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
+                                                                    taskGroupQueue.getId(),
+                                                                    TaskGroupQueueStatus.WAIT_QUEUE.getCode());
         if (affectedCount > 0) {
             taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
             this.taskGroupQueueMapper.updateById(taskGroupQueue);
@@ -2925,11 +2927,6 @@ public class ProcessServiceImpl implements ProcessService {
         return false;
     }
 
-    @Override
-    public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
-        return robTaskGroupResouce(taskGroupQueue);
-    }
-
     @Override
     public void releaseAllTaskGroup(int processInstanceId) {
         List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
@@ -2946,40 +2943,41 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
 
-        TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
-        if (taskGroup == null) {
-            return null;
-        }
-        TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
-        if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
-            return null;
-        }
+        TaskGroup taskGroup;
+        TaskGroupQueue thisTaskGroupQueue;
         try {
-            while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
-                , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
+            do {
+                taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
+                if (taskGroup == null) {
+                    return null;
+                }
                 thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
                 if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
                     return null;
                 }
-                taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
-            }
+            } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
+                && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
+                                                            taskGroup.getUseSize(),
+                                                            thisTaskGroupQueue.getId(),
+                                                            TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
         } catch (Exception e) {
             logger.error("release the task group error", e);
+            return null;
         }
         logger.info("updateTask:{}", taskInstance.getName());
         changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
-        TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
-            TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
-        if (taskGroupQueue == null) {
-            return null;
-        }
-        while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
+        TaskGroupQueue taskGroupQueue;
+        do {
             taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
-                TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
+                                                                                    TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
+                                                                                    Flag.NO.getCode(),
+                                                                                    Flag.NO.getCode());
             if (taskGroupQueue == null) {
                 return null;
             }
-        }
+        } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
+                                                            Flag.YES.getCode(),
+                                                            taskGroupQueue.getId()) != 1);
         return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
     }
 
@@ -3006,7 +3004,7 @@ public class ProcessServiceImpl implements ProcessService {
      * @param groupId   group id
      * @param processId process id
      * @param priority  priority
-     * @return result and msg code
+     * @return inserted task group queue
      */
     @Override
     public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,