You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/25 06:01:04 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
ExecutorManager interface add generic type (#2012)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 33bf550 ExecutorManager interface add generic type (#2012)
33bf550 is described below
commit 33bf550a33caa47b701b43a787e362dc7a4601f4
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Feb 25 14:00:59 2020 +0800
ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
---
.../server/master/dispatch/ExecutorDispatcher.java | 8 ++++----
.../dispatch/executor/AbstractExecutorManager.java | 2 +-
.../master/dispatch/executor/ExecutorManager.java | 4 ++--
.../dispatch/executor/NettyExecutorManager.java | 12 +++++++++---
.../master/runner/MasterBaseTaskExecThread.java | 22 +++++-----------------
5 files changed, 21 insertions(+), 27 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 01fb840..8a803a2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired
private RoundRobinHostManager hostManager;
- private final ConcurrentHashMap<ExecutorType, ExecutorManager> executorManagers;
+ private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>();
@@ -61,11 +61,11 @@ public class ExecutorDispatcher implements InitializingBean {
* @param context context
* @throws ExecuteException
*/
- public void dispatch(final ExecutionContext context) throws ExecuteException {
+ public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
- ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType());
+ ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if(executorManager == null){
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
@@ -83,7 +83,7 @@ public class ExecutorDispatcher implements InitializingBean {
/**
* task execute
*/
- executorManager.execute(context);
+ return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index e1f0c3c..c0be5a8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
/**
* abstract executor manager
*/
-public abstract class AbstractExecutorManager implements ExecutorManager{
+public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{
/**
* before execute , add time monitor , timeout
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 1d78d2f..9b0b9af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
/**
* executor manager
*/
-public interface ExecutorManager {
+public interface ExecutorManager<T> {
/**
* before execute
@@ -37,7 +37,7 @@ public interface ExecutorManager {
* @param context context
* @throws ExecuteException
*/
- void execute(ExecutionContext context) throws ExecuteException;
+ T execute(ExecutionContext context) throws ExecuteException;
/**
* after execute
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index cf1a264..bdfe71c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -46,7 +46,7 @@ import java.util.Set;
* netty executor manager
*/
@Service
-public class NettyExecutorManager extends AbstractExecutorManager{
+public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
@@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ /**
+ * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
+ * register EXECUTE_TASK_ACK command type TaskAckProcessor
+ */
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
@@ -74,7 +78,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{
* @throws ExecuteException
*/
@Override
- public void execute(ExecutionContext context) throws ExecuteException {
+ public Boolean execute(ExecutionContext context) throws ExecuteException {
/**
* all nodes
@@ -118,6 +122,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{
}
}
}
+
+ return success;
}
/**
@@ -189,7 +195,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{
break;
case CLIENT:
break;
- default:
+ default:
throw new IllegalArgumentException("invalid executor type : " + executorType);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index f675493..9bf69dd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -20,22 +20,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.utils.BeanContext;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.future.InvokeCallback;
-import org.apache.dolphinscheduler.remote.future.ResponseFuture;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -48,7 +36,6 @@ import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Callable;
@@ -138,14 +125,16 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* dispatch task to worker
* @param taskInstance
*/
- public void dispatch(TaskInstance taskInstance){
+ private Boolean dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
try {
- dispatcher.dispatch(executionContext);
+ return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
+ return false;
}
+
}
/**
@@ -234,8 +223,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
if(submitDB && !submitQueue){
// submit task to queue
- dispatch(task);
- submitQueue = true;
+ submitQueue = dispatch(task);
}
if(submitDB && submitQueue){
return task;