You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/04 14:22:04 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
Refactor worker (#2081)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 15bf740 Refactor worker (#2081)
15bf740 is described below
commit 15bf740221a3a1731360dfbae0618f7623ba471d
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Mar 4 22:21:54 2020 +0800
Refactor worker (#2081)
* refactor kill logic
* refactor ExecutionContext
---
.../server/entity/TaskExecutionContext.java | 17 +++++++
.../server/master/dispatch/ExecutorDispatcher.java | 7 +--
.../master/dispatch/context/ExecutionContext.java | 35 +++++++++----
.../dispatch/executor/NettyExecutorManager.java | 57 ++--------------------
.../master/runner/MasterBaseTaskExecThread.java | 9 ++--
.../server/master/runner/MasterTaskExecThread.java | 3 +-
6 files changed, 53 insertions(+), 75 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 2348b47..03ce0ee 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.entity;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
@@ -397,6 +402,18 @@ public class TaskExecutionContext implements Serializable{
this.dataxTaskExecutionContext = dataxTaskExecutionContext;
}
+ public Command toCommand(){
+ TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
+ requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
+ return requestCommand.convert2Command();
+ }
+
+ public Command toKillCommand(){
+ TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
+ requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
+ return requestCommand.convert2Command();
+ }
+
@Override
public String toString() {
return "TaskExecutionContext{" +
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 97b489e..38914e5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -45,9 +44,6 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired
private NettyExecutorManager nettyExecutorManager;
- @Autowired
- private MasterConfig masterConfig;
-
/**
* round robin host manager
*/
@@ -87,10 +83,9 @@ public class ExecutorDispatcher implements InitializingBean {
*/
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
- throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
+ throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getCommand()));
}
context.setHost(host);
- context.getContext().setHost(host.getAddress());
executorManager.beforeExecute(context);
try {
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 19124d3..9d04511 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.context;
+import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
/**
@@ -32,30 +32,47 @@ public class ExecutionContext {
private Host host;
/**
- * context
+ * command
*/
- private final TaskExecutionContext context;
+ private final Command command;
/**
* executor type : worker or client
*/
private final ExecutorType executorType;
- public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) {
- this.context = context;
+ /**
+ * worker group
+ */
+ private String workerGroup;
+
+
+ public ExecutionContext(Command command, ExecutorType executorType) {
+ this.command = command;
this.executorType = executorType;
}
- public String getWorkerGroup(){
- return context.getWorkerGroup();
+ public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {
+ this.command = command;
+ this.executorType = executorType;
+ this.workerGroup = workerGroup;
+ }
+
+ public Command getCommand() {
+ return command;
}
public ExecutorType getExecutorType() {
return executorType;
}
- public TaskExecutionContext getContext() {
- return context;
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
+
+ public String getWorkerGroup(){
+ return this.workerGroup;
}
public Host getHost() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 7719cf8..e3d45f0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -21,12 +21,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -98,7 +94,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
/**
* build command accord executeContext
*/
- Command command = buildCommand(context);
+ Command command = context.getCommand();
/**
* execute task host
@@ -111,14 +107,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
success = true;
context.setHost(host);
} catch (ExecuteException ex) {
- logger.error(String.format("execute context : %s error", context.getContext()), ex);
+ logger.error(String.format("execute command : %s error", command), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
- logger.error("retry execute context : {} host : {}", context.getContext(), host);
+ logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
@@ -133,53 +129,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
@Override
public void executeDirectly(ExecutionContext context) throws ExecuteException {
- Command command = buildKillCommand(context);
Host host = context.getHost();
- doExecute(host,command);
- }
-
- /**
- * build command
- * @param context context
- * @return command
- */
- private Command buildCommand(ExecutionContext context) {
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
- ExecutorType executorType = context.getExecutorType();
- switch (executorType){
- case WORKER:
- TaskExecutionContext taskExecutionContext = context.getContext();
- requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
- break;
- case CLIENT:
- break;
- default:
- throw new IllegalArgumentException("invalid executor type : " + executorType);
-
- }
- return requestCommand.convert2Command();
- }
-
- /**
- * build command
- * @param context context
- * @return command
- */
- private Command buildKillCommand(ExecutionContext context) {
- TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
- ExecutorType executorType = context.getExecutorType();
- switch (executorType){
- case WORKER:
- TaskExecutionContext taskExecutionContext = context.getContext();
- requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
- break;
- case CLIENT:
- break;
- default:
- throw new IllegalArgumentException("invalid executor type : " + executorType);
-
- }
- return requestCommand.convert2Command();
+ doExecute(host, context.getCommand());
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 9d40de9..71bb8f8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
-import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE;
/**
* master task exec base class
@@ -131,7 +130,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
private Boolean dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
- ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try {
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
@@ -227,8 +226,8 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
if(submitDB && !submitTask){
- // dispatcht task
- submitTask = dispatchtTask(task);
+ // dispatch task
+ submitTask = dispatchTask(task);
}
if(submitDB && submitTask){
return task;
@@ -254,7 +253,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param taskInstance taskInstance
* @return whether submit task success
*/
- public Boolean dispatchtTask(TaskInstance taskInstance) {
+ public Boolean dispatchTask(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index a196832..fb3f8e9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -184,7 +183,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
alreadyKilled = true;
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
- ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
+ ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);