You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/23 02:05:11 UTC
[dolphinscheduler] branch dev updated: Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4d13a5104b Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)
4d13a5104b is described below
commit 4d13a5104b9a4ab16e16253ad2936107031e59e4
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Aug 23 10:05:03 2022 +0800
Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)
---
.../master/runner/task/CommonTaskProcessor.java | 39 +++++-----
.../service/process/ProcessServiceImpl.java | 91 +++++++++++++---------
2 files changed, 74 insertions(+), 56 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 24b7fd0a7c..e5413c1490 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner.task;
+import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -31,12 +33,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.Date;
-import com.google.auto.service.AutoService;
-
/**
* common task processor
*/
@@ -148,24 +146,29 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
-
- TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
- killCommand.setTaskInstanceId(taskInstance.getId());
-
- ExecutionContext executionContext =
- new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
-
- Host host = Host.of(taskInstance.getHost());
- executionContext.setHost(host);
-
- nettyExecutorManager.executeDirectly(executionContext);
- } catch (ExecuteException e) {
- logger.error("kill task error:", e);
+ if (StringUtils.isNotEmpty(taskInstance.getHost())) {
+ killRemoteTask();
+ }
+ } catch (Exception e) {
+ logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e);
return false;
}
- logger.info("master kill taskInstance name :{} taskInstance id:{}",
+ logger.info("master success kill taskInstance name: {} taskInstance id: {}",
taskInstance.getName(), taskInstance.getId());
return true;
}
+
+ private void killRemoteTask() throws ExecuteException {
+ TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
+ killCommand.setTaskInstanceId(taskInstance.getId());
+
+ ExecutionContext executionContext =
+ new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
+
+ Host host = Host.of(taskInstance.getHost());
+ executionContext.setHost(host);
+
+ nettyExecutorManager.executeDirectly(executionContext);
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index b4faf51168..130e9f1ae9 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,24 +17,25 @@
package org.apache.dolphinscheduler.service.process;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-
-import static java.util.stream.Collectors.toSet;
-
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -116,8 +117,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
+import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.cron.CronUtils;
@@ -128,8 +129,11 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
@@ -144,20 +148,20 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import io.micrometer.core.annotation.Counted;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/**
* process relative dao that some mappers in this.
@@ -2981,8 +2985,10 @@ public class ProcessServiceImpl implements ProcessService {
// try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId);
if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
+ logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
return true;
}
+ logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
@@ -2997,11 +3003,13 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
+ logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
+ logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
return false;
}
@@ -3024,26 +3032,31 @@ public class ProcessServiceImpl implements ProcessService {
TaskGroup taskGroup;
TaskGroupQueue thisTaskGroupQueue;
+ logger.info("Begin to release task group: {}", taskInstance.getTaskGroupId());
try {
do {
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
+ logger.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId());
return null;
}
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
+ logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
return null;
}
} while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
- taskGroup.getUseSize(),
- thisTaskGroupQueue.getId(),
- TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
+ taskGroup.getUseSize(),
+ thisTaskGroupQueue.getId(),
+ TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) {
logger.error("release the task group error", e);
return null;
}
- logger.info("updateTask:{}", taskInstance.getName());
+ logger.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId());
+
+ logger.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue;
do {
@@ -3052,11 +3065,13 @@ public class ProcessServiceImpl implements ProcessService {
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
+ logger.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId());
return null;
}
} while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
+ logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId());
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}