You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/23 06:45:56 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
Refactor worker (#2279)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 92fd647 Refactor worker (#2279)
92fd647 is described below
commit 92fd6479a948883e47757833a77842e6bfbec101
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Mar 23 14:45:07 2020 +0800
Refactor worker (#2279)
* let quartz use the same datasource
* move master/worker config from dao.properties to each config
add master/worker registry test
* move mybatis config from application.properties to SpringConnectionFactory
* move mybatis-plus config from application.properties to SpringConnectionFactory
* refactor TaskCallbackService
* add ZookeeperNodeManagerTest
* add NettyExecutorManagerTest
* refactor TaskKillProcessor
---
.../remote/command/TaskKillRequestCommand.java | 2 +-
.../server/entity/TaskExecutionContext.java | 7 --
.../server/master/runner/MasterTaskExecThread.java | 16 +--
.../server/worker/processor/TaskKillProcessor.java | 116 ++++++++++-----------
4 files changed, 60 insertions(+), 81 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index e5c756a..b8e02dd 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class TaskKillRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public TaskKillRequestCommand() {
}
public TaskKillRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillRequestCommand{" +
"taskExecutio
nContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class TaskKillRequestCommand implements Serializable {
/**
* task id
*/
private int taskInstanceId;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillRequestCommand{" +
"taskInstanceId=" + taskInstanceId +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index cdaa0f0..0d88d6a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
@@ -421,12 +420,6 @@ public class TaskExecutionContext implements Serializable{
return requestCommand.convert2Command();
}
- public Command toKillCommand(){
- TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
- requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
- return requestCommand.convert2Command();
- }
-
@Override
public String toString() {
return "TaskExecutionContext{" +
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 172794e..f220a09 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
+import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@@ -25,9 +26,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -172,8 +172,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
- * TODO Kill TASK
- *
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance() throws Exception{
@@ -182,14 +180,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
alreadyKilled = true;
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
- taskExecutionContext.setTaskInstanceId(taskInstance.getId());
- taskExecutionContext.setHost(taskInstance.getHost());
- taskExecutionContext.setLogPath(taskInstance.getLogPath());
- taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
- taskExecutionContext.setProcessId(taskInstance.getPid());
+ TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
+ killCommand.setTaskInstanceId(taskInstance.getId());
- ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
+ ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 5a8c668..b6f5827 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
-import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
/**
@@ -66,11 +67,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
- /**
- * appIds
- */
- private List<String> appIds;
-
public TaskKillProcessor(){
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
@@ -79,86 +75,80 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
/**
- * kill task logic
+ * task kill process
*
- * @param context context
- * @return execute result
+ * @param channel channel channel
+ * @param command command command
*/
- private Boolean doKill(TaskExecutionContext context){
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+ TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
+ logger.info("received kill command : {}", killCommand);
+
+ Pair<Boolean, List<String>> result = doKill(killCommand);
+
+ taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
+ new NettyRemoteChannel(channel, command.getOpaque()));
+
+ TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);
+ taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
+ }
+
+ /**
+ * do kill
+ * @param killCommand
+ * @return kill result
+ */
+ private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
+ List<String> appIds = Collections.EMPTY_LIST;
try {
- TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId());
- context.setProcessId(taskExecutionContext.getProcessId());
+ TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)){
- logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId());
- return false;
+ logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
+ return Pair.of(false, appIds);
}
+ String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
- String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId()));
-
- logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd);
+ logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
-
// find log and kill yarn job
- killYarnJob(Host.of(context.getHost()).getIp(),
- context.getLogPath(),
- context.getExecutePath(),
- context.getTenantCode());
+ appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
+ taskExecutionContext.getLogPath(),
+ taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getTenantCode());
- return true;
+ return Pair.of(true, appIds);
} catch (Exception e) {
logger.error("kill task error", e);
- return false;
}
- }
-
- /**
- * task kill process
- *
- * @param channel channel channel
- * @param command command command
- */
- @Override
- public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
- TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
- logger.info("received command : {}", taskKillRequestCommand);
-
-
- String contextJson = taskKillRequestCommand.getTaskExecutionContext();
-
- TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
-
- Boolean killStatus = doKill(taskExecutionContext);
-
- taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
- new NettyRemoteChannel(channel, command.getOpaque()));
-
- TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
- taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
+ return Pair.of(false, appIds);
}
/**
* build TaskKillResponseCommand
*
- * @param taskExecutionContext taskExecutionContext
- * @param killStatus killStatus
+ * @param killCommand kill command
+ * @param result exe result
* @return build TaskKillResponseCommand
*/
- private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
- Boolean killStatus) {
+ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
+ Pair<Boolean, List<String>> result) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
- taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskKillResponseCommand.setHost(taskExecutionContext.getHost());
- taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
- taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
- taskKillResponseCommand.setAppIds(appIds);
-
+ taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
+ taskKillResponseCommand.setAppIds(result.getRight());
+ TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+ if(taskExecutionContext != null){
+ taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+ taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+ }
return taskKillResponseCommand;
}
@@ -169,8 +159,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
+ * @return List<String> appIds
*/
- private void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
+ private List<String> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
LogClientService logClient = null;
try {
logClient = new LogClientService();
@@ -185,9 +176,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
+ return appIds;
}
}
-
} catch (Exception e) {
logger.error("kill yarn job error",e);
} finally {
@@ -195,6 +186,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logClient.close();
}
}
+ return Collections.EMPTY_LIST;
}
}