You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/23 02:05:11 UTC

[dolphinscheduler] branch dev updated: Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4d13a5104b Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)
4d13a5104b is described below

commit 4d13a5104b9a4ab16e16253ad2936107031e59e4
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Aug 23 10:05:03 2022 +0800

    Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)
---
 .../master/runner/task/CommonTaskProcessor.java    | 39 +++++-----
 .../service/process/ProcessServiceImpl.java        | 91 +++++++++++++---------
 2 files changed, 74 insertions(+), 56 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 24b7fd0a7c..e5413c1490 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -31,12 +33,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.Date;
 
-import com.google.auto.service.AutoService;
-
 /**
  * common task processor
  */
@@ -148,24 +146,29 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             taskInstance.setState(TaskExecutionStatus.KILL);
             taskInstance.setEndTime(new Date());
             processService.updateTaskInstance(taskInstance);
-
-            TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
-            killCommand.setTaskInstanceId(taskInstance.getId());
-
-            ExecutionContext executionContext =
-                    new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
-
-            Host host = Host.of(taskInstance.getHost());
-            executionContext.setHost(host);
-
-            nettyExecutorManager.executeDirectly(executionContext);
-        } catch (ExecuteException e) {
-            logger.error("kill task error:", e);
+            if (StringUtils.isNotEmpty(taskInstance.getHost())) {
+                killRemoteTask();
+            }
+        } catch (Exception e) {
+            logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e);
             return false;
         }
 
-        logger.info("master kill taskInstance name :{} taskInstance id:{}",
+        logger.info("master success kill taskInstance name: {} taskInstance id: {}",
                 taskInstance.getName(), taskInstance.getId());
         return true;
     }
+
+    private void killRemoteTask() throws ExecuteException {
+        TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
+        killCommand.setTaskInstanceId(taskInstance.getId());
+
+        ExecutionContext executionContext =
+                new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
+
+        Host host = Host.of(taskInstance.getHost());
+        executionContext.setHost(host);
+
+        nettyExecutorManager.executeDirectly(executionContext);
+    }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index b4faf51168..130e9f1ae9 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,24 +17,25 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-
-import static java.util.stream.Collectors.toSet;
-
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -116,8 +117,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
+import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.cron.CronUtils;
@@ -128,8 +129,11 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -144,20 +148,20 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import io.micrometer.core.annotation.Counted;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
 
 /**
  * process relative dao that some mappers in this.
@@ -2981,8 +2985,10 @@ public class ProcessServiceImpl implements ProcessService {
         // try to get taskGroup
         int count = taskGroupMapper.selectAvailableCountById(groupId);
         if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
+            logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
             return true;
         }
+        logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
         this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
         return false;
     }
@@ -2997,11 +3003,13 @@ public class ProcessServiceImpl implements ProcessService {
                 taskGroupQueue.getId(),
                 TaskGroupQueueStatus.WAIT_QUEUE.getCode());
         if (affectedCount > 0) {
+            logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
             taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
             this.taskGroupQueueMapper.updateById(taskGroupQueue);
             this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
             return true;
         }
+        logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
         return false;
     }
 
@@ -3024,26 +3032,31 @@ public class ProcessServiceImpl implements ProcessService {
 
         TaskGroup taskGroup;
         TaskGroupQueue thisTaskGroupQueue;
+        logger.info("Begin to release task group: {}", taskInstance.getTaskGroupId());
         try {
             do {
                 taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
                 if (taskGroup == null) {
+                    logger.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId());
                     return null;
                 }
                 thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
                 if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
+                    logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
                     return null;
                 }
             } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
                     && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
-                            taskGroup.getUseSize(),
-                            thisTaskGroupQueue.getId(),
-                            TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
+                    taskGroup.getUseSize(),
+                    thisTaskGroupQueue.getId(),
+                    TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
         } catch (Exception e) {
             logger.error("release the task group error", e);
             return null;
         }
-        logger.info("updateTask:{}", taskInstance.getName());
+        logger.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId());
+
+        logger.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId());
         changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
         TaskGroupQueue taskGroupQueue;
         do {
@@ -3052,11 +3065,13 @@ public class ProcessServiceImpl implements ProcessService {
                     Flag.NO.getCode(),
                     Flag.NO.getCode());
             if (taskGroupQueue == null) {
+                logger.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId());
                 return null;
             }
         } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
                 Flag.YES.getCode(),
                 taskGroupQueue.getId()) != 1);
+        logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId());
         return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
     }