You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/24 03:40:33 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: add
comment (#2006)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 6053f9b add comment (#2006)
6053f9b is described below
commit 6053f9b2dfceaf7ebfe2a3d9e5ba9ed40cd6f62b
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Mon Feb 24 11:40:24 2020 +0800
add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
---
.../apache/dolphinscheduler/remote/utils/Host.java | 36 ++++++++---
.../server/master/dispatch/ExecutorDispatcher.java | 51 ++++++++++++---
.../master/dispatch/context/ExecutionContext.java | 12 ++++
.../server/master/dispatch/enums/ExecutorType.java | 4 +-
.../dispatch/exceptions/ExecuteException.java | 4 +-
.../dispatch/executor/AbstractExecutorManager.java | 21 ++++--
.../master/dispatch/executor/ExecutorManager.java | 23 ++++++-
.../dispatch/executor/NettyExecutorManager.java | 75 ++++++++++++++++++----
.../server/master/dispatch/host/HostManager.java | 8 +++
.../dispatch/host/RoundRobinHostManager.java | 23 +++++++
.../dispatch/host/assign/RandomSelector.java | 11 +++-
.../dispatch/host/assign/RoundRobinSelector.java | 10 +++
.../master/dispatch/host/assign/Selector.java | 9 +++
.../server/master/future/TaskFuture.java | 29 +++++----
.../server/master/processor/TaskAckProcessor.java | 8 +++
.../server/registry/ZookeeperNodeManager.java | 71 +++++++++++++++++++-
.../server/registry/ZookeeperRegistryCenter.java | 61 +++++++++++++++++-
17 files changed, 397 insertions(+), 59 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index f53c611..fde6830 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -24,10 +24,19 @@ import java.util.Objects;
*/
public class Host implements Serializable {
+ /**
+ * address
+ */
private String address;
+ /**
+ * ip
+ */
private String ip;
+ /**
+ * port
+ */
private int port;
public Host() {
@@ -65,6 +74,11 @@ public class Host implements Serializable {
this.address = ip + ":" + port;
}
+ /**
+ * address convert host
+ * @param address address
+ * @return host
+ */
public static Host of(String address){
String[] parts = address.split(":");
if (parts.length != 2) {
@@ -75,16 +89,13 @@ public class Host implements Serializable {
}
@Override
- public String toString() {
- return "Host{" +
- "address='" + address + '\'' +
- '}';
- }
-
- @Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
Host host = (Host) o;
return Objects.equals(getAddress(), host.getAddress());
}
@@ -93,4 +104,11 @@ public class Host implements Serializable {
public int hashCode() {
return Objects.hash(getAddress());
}
+
+ @Override
+ public String toString() {
+ return "Host{" +
+ "address='" + address + '\'' +
+ '}';
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 2fd303a..01fb840 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -32,12 +32,21 @@ import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
+/**
+ * executor dispatcher
+ */
@Service
public class ExecutorDispatcher implements InitializingBean {
+ /**
+ * netty executor manager
+ */
@Autowired
private NettyExecutorManager nettyExecutorManager;
+ /**
+ * round robin host manager
+ */
@Autowired
private RoundRobinHostManager hostManager;
@@ -47,30 +56,54 @@ public class ExecutorDispatcher implements InitializingBean {
this.executorManagers = new ConcurrentHashMap<>();
}
- public void dispatch(final ExecutionContext executeContext) throws ExecuteException {
- ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType());
+ /**
+ * task dispatch
+ * @param context context
+ * @throws ExecuteException
+ */
+ public void dispatch(final ExecutionContext context) throws ExecuteException {
+ /**
+ * get executor manager
+ */
+ ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType());
if(executorManager == null){
- throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType());
+ throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
- Host host = hostManager.select(executeContext);
+
+ /**
+ * host select
+ */
+ Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
- throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext()));
+ throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
}
- executeContext.setHost(host);
- executorManager.beforeExecute(executeContext);
+ context.setHost(host);
+ executorManager.beforeExecute(context);
try {
- executorManager.execute(executeContext);
+ /**
+ * task execute
+ */
+ executorManager.execute(context);
} finally {
- executorManager.afterExecute(executeContext);
+ executorManager.afterExecute(context);
}
}
+ /**
+ * register init
+ * @throws Exception
+ */
@Override
public void afterPropertiesSet() throws Exception {
register(ExecutorType.WORKER, nettyExecutorManager);
register(ExecutorType.CLIENT, nettyExecutorManager);
}
+ /**
+ * register
+ * @param type executor type
+ * @param executorManager executorManager
+ */
public void register(ExecutorType type, ExecutorManager executorManager){
executorManagers.put(type, executorManager);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 4bccba0..14c7d9f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -20,12 +20,24 @@ package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+/**
+ * execution context
+ */
public class ExecutionContext {
+ /**
+ * host
+ */
private Host host;
+ /**
+ * context
+ */
private final Object context;
+ /**
+ * executor type : worker or client
+ */
private final ExecutorType executorType;
public ExecutionContext(Object context, ExecutorType executorType) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
index 70aaeae..03be62e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
@@ -16,7 +16,9 @@
*/
package org.apache.dolphinscheduler.server.master.dispatch.enums;
-
+/**
+ * executor type
+ */
public enum ExecutorType {
WORKER,
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
index d8ca50a..8a441b9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
@@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
-
+/**
+ * execute exception
+ */
public class ExecuteException extends Exception{
public ExecuteException() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index 65ed15e..e1f0c3c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -20,17 +20,26 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-
+/**
+ * abstract executor manager
+ */
public abstract class AbstractExecutorManager implements ExecutorManager{
+ /**
+ * before execute , add time monitor , timeout
+ * @param context context
+ * @throws ExecuteException
+ */
@Override
- public void beforeExecute(ExecutionContext executeContext) throws ExecuteException {
- //TODO add time monitor
+ public void beforeExecute(ExecutionContext context) throws ExecuteException {
}
+ /**
+ * after execute , add dispatch monitor
+ * @param context context
+ * @throws ExecuteException
+ */
@Override
- public void afterExecute(ExecutionContext executeContext) throws ExecuteException {
- //TODO add dispatch monitor
-
+ public void afterExecute(ExecutionContext context) throws ExecuteException {
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 98d391e..1d78d2f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -20,12 +20,29 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-
+/**
+ * executor manager
+ */
public interface ExecutorManager {
+ /**
+ * before execute
+ * @param executeContext executeContext
+ * @throws ExecuteException
+ */
void beforeExecute(ExecutionContext executeContext) throws ExecuteException;
- void execute(ExecutionContext executeContext) throws ExecuteException;
+ /**
+ * execute task
+ * @param context context
+ * @throws ExecuteException
+ */
+ void execute(ExecutionContext context) throws ExecuteException;
- void afterExecute(ExecutionContext executeContext) throws ExecuteException;
+ /**
+ * after execute
+ * @param context context
+ * @throws ExecuteException
+ */
+ void afterExecute(ExecutionContext context) throws ExecuteException;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index e24bbe7..e07bea4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -42,47 +42,78 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-
+/**
+ * netty executor manager
+ */
@Service
public class NettyExecutorManager extends AbstractExecutorManager{
private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
+ /**
+ * zookeeper node manager
+ */
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
+ /**
+ * netty remote client
+ */
private final NettyRemotingClient nettyRemotingClient;
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ /**
+ * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
+ * register EXECUTE_TASK_ACK command type TaskAckProcessor
+ */
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
+ /**
+ * execute logic
+ * @param context context
+ * @throws ExecuteException
+ */
@Override
- public void execute(ExecutionContext executeContext) throws ExecuteException {
- Set<String> allNodes = getAllNodes(executeContext);
+ public void execute(ExecutionContext context) throws ExecuteException {
+
+ /**
+ * all nodes
+ */
+ Set<String> allNodes = getAllNodes(context);
+
+ /**
+ * fail nodes
+ */
Set<String> failNodeSet = new HashSet<>();
- //
- Command command = buildCommand(executeContext);
- Host host = executeContext.getHost();
+
+ /**
+ * build command accord executeContext
+ */
+ Command command = buildCommand(context);
+
+ /**
+ * execute task host
+ */
+ Host host = context.getHost();
boolean success = false;
- //
while (!success) {
try {
- doExecute(host, command);
+ doExecute(host,command);
success = true;
- executeContext.setHost(host);
+ context.setHost(host);
} catch (ExecuteException ex) {
- logger.error(String.format("execute context : %s error", executeContext.getContext()), ex);
+ logger.error(String.format("execute context : %s error", context.getContext()), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
- logger.error("retry execute context : {} host : {}", executeContext.getContext(), host);
+ logger.error("retry execute context : {} host : {}", context.getContext(), host);
} else {
throw new ExecuteException("fail after try all nodes");
}
@@ -93,6 +124,11 @@ public class NettyExecutorManager extends AbstractExecutorManager{
}
}
+ /**
+ * build command
+ * @param context context
+ * @return command
+ */
private Command buildCommand(ExecutionContext context) {
ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
ExecutorType executorType = context.getExecutorType();
@@ -110,7 +146,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{
return requestCommand.convert2Command();
}
+ /**
+ * execute logic
+ * @param host host
+ * @param command command
+ * @throws ExecuteException
+ */
private void doExecute(final Host host, final Command command) throws ExecuteException {
+ /**
+ * retry count,default retry 3
+ */
int retryCount = 3;
boolean success = false;
do {
@@ -131,8 +176,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{
}
}
+ /**
+ * get all nodes
+ * @param context context
+ * @return nodes
+ */
private Set<String> getAllNodes(ExecutionContext context){
Set<String> nodes = Collections.EMPTY_SET;
+ /**
+ * executor type
+ */
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
index 8708273..ec65cab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -21,8 +21,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+/**
+ * host manager
+ */
public interface HostManager {
+ /**
+ * select host
+ * @param context context
+ * @return host
+ */
Host select(ExecutionContext context);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 1c222b8..3bb001e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -34,24 +34,44 @@ import java.util.Collection;
import java.util.List;
+/**
+ * round robin host manager
+ */
@Service
public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
+ /**
+ * zookeeperNodeManager
+ */
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
+ /**
+ * selector
+ */
private final Selector<Host> selector;
+ /**
+ * set round robin
+ */
public RoundRobinHostManager(){
this.selector = new RoundRobinSelector<>();
}
+ /**
+ * select host
+ * @param context context
+ * @return host
+ */
@Override
public Host select(ExecutionContext context){
Host host = new Host();
Collection<String> nodes = null;
+ /**
+ * executor type
+ */
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
@@ -69,6 +89,9 @@ public class RoundRobinHostManager implements HostManager {
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
+ /**
+ * select
+ */
return selector.select(candidateHosts);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index cf8c0e8..be52fcb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -20,7 +20,10 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Random;
-
+/**
+ * random selector
+ * @param <T> T
+ */
public class RandomSelector<T> implements Selector<T> {
private final Random random = new Random();
@@ -32,11 +35,17 @@ public class RandomSelector<T> implements Selector<T> {
throw new IllegalArgumentException("Empty source.");
}
+ /**
+ * if only one , return directly
+ */
if (source.size() == 1) {
return (T) source.toArray()[0];
}
int size = source.size();
+ /**
+ * random select
+ */
int randomIndex = random.nextInt(size);
return (T) source.toArray()[randomIndex];
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index 90319de..1eb30c8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -21,6 +21,10 @@ import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * round robin selector
+ * @param <T> T
+ */
@Service
public class RoundRobinSelector<T> implements Selector<T> {
@@ -32,11 +36,17 @@ public class RoundRobinSelector<T> implements Selector<T> {
throw new IllegalArgumentException("Empty source.");
}
+ /**
+ * if only one , return directly
+ */
if (source.size() == 1) {
return (T)source.toArray()[0];
}
int size = source.size();
+ /**
+ * round robin
+ */
return (T) source.toArray()[index.getAndIncrement() % size];
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index bd7c4ac..0864981 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -20,7 +20,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
+/**
+ * selector
+ * @param <T> T
+ */
public interface Selector<T> {
+ /**
+ * select
+ * @param source source
+ * @return T
+ */
T select(Collection<T> source);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
index 32fb55f..0c6d740 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
@@ -29,6 +29,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+/**
+ * task fulture
+ */
public class TaskFuture {
private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class);
@@ -139,19 +142,6 @@ public class TaskFuture {
}
- @Override
- public String toString() {
- return "ResponseFuture{" +
- "opaque=" + opaque +
- ", timeoutMillis=" + timeoutMillis +
- ", latch=" + latch +
- ", beginTimestamp=" + beginTimestamp +
- ", responseCommand=" + responseCommand +
- ", sendOk=" + sendOk +
- ", cause=" + cause +
- '}';
- }
-
/**
* scan future table
*/
@@ -168,4 +158,17 @@ public class TaskFuture {
}
}
}
+
+ @Override
+ public String toString() {
+ return "TaskFuture{" +
+ "opaque=" + opaque +
+ ", timeoutMillis=" + timeoutMillis +
+ ", latch=" + latch +
+ ", beginTimestamp=" + beginTimestamp +
+ ", responseCommand=" + responseCommand +
+ ", sendOk=" + sendOk +
+ ", cause=" + cause +
+ '}';
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index f5f2123..83da3b0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -46,11 +46,19 @@ public class TaskAckProcessor implements NettyRequestProcessor {
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
+ /**
+ * task ack process
+ * @param channel channel channel
+ * @param command command ExecuteTaskAckCommand
+ */
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand);
+ /**
+ * change Task state
+ */
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
taskAckCommand.getHost(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index c7a2d0b..1d6808d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -33,37 +33,80 @@ import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
+/**
+ * zookeeper node manager
+ */
@Service
public class ZookeeperNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
+ /**
+ * master lock
+ */
private final Lock masterLock = new ReentrantLock();
+ /**
+ * worker lock
+ */
private final Lock workerLock = new ReentrantLock();
+ /**
+ * worker nodes
+ */
private final Set<String> workerNodes = new HashSet<>();
+ /**
+ * master nodes
+ */
private final Set<String> masterNodes = new HashSet<>();
+ /**
+ * zookeeper registry center
+ */
@Autowired
private ZookeeperRegistryCenter registryCenter;
+ /**
+ * init listener
+ * @throws Exception
+ */
@Override
public void afterPropertiesSet() throws Exception {
+ /**
+ * load nodes from zookeeper
+ */
load();
+ /**
+ * init MasterNodeListener listener
+ */
registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
+ /**
+ * init WorkerNodeListener listener
+ */
registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener());
}
+ /**
+ * load nodes from zookeeper
+ */
private void load(){
- Set<String> schedulerNodes = registryCenter.getMasterNodesDirectly();
- syncMasterNodes(schedulerNodes);
+ /**
+ * master nodes from zookeeper
+ */
+ Set<String> masterNodes = registryCenter.getMasterNodesDirectly();
+ syncMasterNodes(masterNodes);
+
+ /**
+ * worker nodes from zookeeper
+ */
Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(workersNodes);
}
+ /**
+ * worker node listener
+ */
class WorkerNodeListener extends AbstractListener {
@Override
@@ -91,6 +134,9 @@ public class ZookeeperNodeManager implements InitializingBean {
}
+ /**
+ * master node listener
+ */
class MasterNodeListener extends AbstractListener {
@Override
@@ -115,6 +161,10 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
+ /**
+ * get master nodes
+ * @return master nodes
+ */
public Set<String> getMasterNodes() {
masterLock.lock();
try {
@@ -124,6 +174,10 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
+ /**
+ * sync master nodes
+ * @param nodes master nodes
+ */
private void syncMasterNodes(Set<String> nodes){
masterLock.lock();
try {
@@ -134,6 +188,10 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
+ /**
+ * sync worker nodes
+ * @param nodes worker nodes
+ */
private void syncWorkerNodes(Set<String> nodes){
workerLock.lock();
try {
@@ -144,6 +202,10 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
+ /**
+ * get worker nodes
+ * @return worker nodes
+ */
public Set<String> getWorkerNodes(){
workerLock.lock();
try {
@@ -153,6 +215,9 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
+ /**
+ * close
+ */
public void close(){
registryCenter.close();
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 3364a94..7d7e2ef 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -27,17 +27,32 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * zookeeper register center
+ */
@Service
public class ZookeeperRegistryCenter implements InitializingBean {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ /**
+ * namespace
+ */
public static final String NAMESPACE = "/dolphinscheduler";
+ /**
+ * nodes namespace
+ */
public static final String NODES = NAMESPACE + "/nodes";
+ /**
+ * master path
+ */
public static final String MASTER_PATH = NODES + "/master";
+ /**
+ * worker path
+ */
public static final String WORKER_PATH = NODES + "/worker";
public static final String EMPTY = "";
@@ -50,19 +65,26 @@ public class ZookeeperRegistryCenter implements InitializingBean {
init();
}
+ /**
+ * init node persist
+ */
public void init() {
if (isStarted.compareAndSet(false, true)) {
- //TODO
-// zookeeperCachedOperator.start(NODES);
initNodes();
}
}
+ /**
+ * init nodes
+ */
private void initNodes() {
zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
}
+ /**
+ * close
+ */
public void close() {
if (isStarted.compareAndSet(true, false)) {
if (zookeeperCachedOperator != null) {
@@ -71,36 +93,71 @@ public class ZookeeperRegistryCenter implements InitializingBean {
}
}
+ /**
+ * get master path
+ * @return master path
+ */
public String getMasterPath() {
return MASTER_PATH;
}
+ /**
+ * get worker path
+ * @return worker path
+ */
public String getWorkerPath() {
return WORKER_PATH;
}
+ /**
+ * get master nodes directly
+ * @return master nodes
+ */
public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(MASTER_PATH);
return new HashSet<>(masters);
}
+ /**
+ * get worker nodes directly
+ * @return master nodes
+ */
public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(WORKER_PATH);
return new HashSet<>(workers);
}
+ /**
+ * whether worker path
+ * @param path path
+ * @return result
+ */
public boolean isWorkerPath(String path) {
return path != null && path.contains(WORKER_PATH);
}
+ /**
+ * whether master path
+ * @param path path
+ * @return result
+ */
public boolean isMasterPath(String path) {
return path != null && path.contains(MASTER_PATH);
}
+ /**
+ * get children nodes
+ * @param key key
+ * @return children nodes
+ */
public List<String> getChildrenKeys(final String key) {
return zookeeperCachedOperator.getChildrenKeys(key);
}
+ /**
+ * get zookeeperCachedOperator
+ * @return zookeeperCachedOperator
+ */
public ZookeeperCachedOperator getZookeeperCachedOperator() {
return zookeeperCachedOperator;
}