You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/25 06:25:41 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#6)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 89a70f8  Refactor worker (#6)
     new 318a1c3  Merge branch 'refactor-worker' into refactor-worker
89a70f8 is described below

commit 89a70f8b2ba6a83f458877bd3bdc12b374582eb2
Author: Tboy <te...@yeah.net>
AuthorDate: Tue Feb 25 10:37:15 2020 +0800

    Refactor worker (#6)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
 .../apache/dolphinscheduler/remote/utils/Host.java | 36 ++++++++---
 .../server/master/dispatch/ExecutorDispatcher.java | 51 +++++++++++++---
 .../master/dispatch/context/ExecutionContext.java  | 12 ++++
 .../server/master/dispatch/enums/ExecutorType.java |  4 +-
 .../dispatch/exceptions/ExecuteException.java      |  4 +-
 .../dispatch/executor/AbstractExecutorManager.java | 21 +++++--
 .../master/dispatch/executor/ExecutorManager.java  | 23 ++++++-
 .../dispatch/executor/NettyExecutorManager.java    | 71 ++++++++++++++++++----
 .../server/master/dispatch/host/HostManager.java   |  8 +++
 .../dispatch/host/RoundRobinHostManager.java       | 23 +++++++
 .../dispatch/host/assign/RandomSelector.java       | 11 +++-
 .../dispatch/host/assign/RoundRobinSelector.java   | 10 +++
 .../master/dispatch/host/assign/Selector.java      |  9 +++
 .../server/master/future/TaskFuture.java           | 29 +++++----
 .../server/master/processor/TaskAckProcessor.java  |  8 +++
 .../server/registry/ZookeeperNodeManager.java      | 71 +++++++++++++++++++++-
 .../server/registry/ZookeeperRegistryCenter.java   | 61 ++++++++++++++++++-
 17 files changed, 393 insertions(+), 59 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index f53c611..fde6830 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -24,10 +24,19 @@ import java.util.Objects;
  */
 public class Host implements Serializable {
 
+    /**
+     * address
+     */
     private String address;
 
+    /**
+     * ip
+     */
     private String ip;
 
+    /**
+     * port
+     */
     private int port;
 
     public Host() {
@@ -65,6 +74,11 @@ public class Host implements Serializable {
         this.address = ip + ":" + port;
     }
 
+    /**
+     * address convert host
+     * @param address address
+     * @return host
+     */
     public static Host of(String address){
         String[] parts = address.split(":");
         if (parts.length != 2) {
@@ -75,16 +89,13 @@ public class Host implements Serializable {
     }
 
     @Override
-    public String toString() {
-        return "Host{" +
-                "address='" + address + '\'' +
-                '}';
-    }
-
-    @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         Host host = (Host) o;
         return Objects.equals(getAddress(), host.getAddress());
     }
@@ -93,4 +104,11 @@ public class Host implements Serializable {
     public int hashCode() {
         return Objects.hash(getAddress());
     }
+
+    @Override
+    public String toString() {
+        return "Host{" +
+                "address='" + address + '\'' +
+                '}';
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 2fd303a..01fb840 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -32,12 +32,21 @@ import org.springframework.stereotype.Service;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * executor dispatcher
+ */
 @Service
 public class ExecutorDispatcher implements InitializingBean {
 
+    /**
+     * netty executor manager
+     */
     @Autowired
     private NettyExecutorManager nettyExecutorManager;
 
+    /**
+     * round robin host manager
+     */
     @Autowired
     private RoundRobinHostManager hostManager;
 
@@ -47,30 +56,54 @@ public class ExecutorDispatcher implements InitializingBean {
         this.executorManagers = new ConcurrentHashMap<>();
     }
 
-    public void dispatch(final ExecutionContext executeContext) throws ExecuteException {
-        ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType());
+    /**
+     *  task dispatch
+     * @param context context
+     * @throws ExecuteException
+     */
+    public void dispatch(final ExecutionContext context) throws ExecuteException {
+        /**
+         * get executor manager
+         */
+        ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType());
         if(executorManager == null){
-            throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType());
+            throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
         }
-        Host host = hostManager.select(executeContext);
+
+        /**
+         * host select
+         */
+        Host host = hostManager.select(context);
         if (StringUtils.isEmpty(host.getAddress())) {
-            throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext()));
+            throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
         }
-        executeContext.setHost(host);
-        executorManager.beforeExecute(executeContext);
+        context.setHost(host);
+        executorManager.beforeExecute(context);
         try {
-            executorManager.execute(executeContext);
+            /**
+             * task execute
+             */
+            executorManager.execute(context);
         } finally {
-            executorManager.afterExecute(executeContext);
+            executorManager.afterExecute(context);
         }
     }
 
+    /**
+     * register init
+     * @throws Exception
+     */
     @Override
     public void afterPropertiesSet() throws Exception {
         register(ExecutorType.WORKER, nettyExecutorManager);
         register(ExecutorType.CLIENT, nettyExecutorManager);
     }
 
+    /**
+     *  register
+     * @param type executor type
+     * @param executorManager executorManager
+     */
     public void register(ExecutorType type, ExecutorManager executorManager){
         executorManagers.put(type, executorManager);
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 4bccba0..14c7d9f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -20,12 +20,24 @@ package org.apache.dolphinscheduler.server.master.dispatch.context;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
+/**
+ *  execution context
+ */
 public class ExecutionContext {
 
+    /**
+     * host
+     */
     private Host host;
 
+    /**
+     *  context
+     */
     private final Object context;
 
+    /**
+     *  executor type : worker or client
+     */
     private final ExecutorType executorType;
 
     public ExecutionContext(Object context, ExecutorType executorType) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
index 70aaeae..03be62e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
@@ -16,7 +16,9 @@
  */
 package org.apache.dolphinscheduler.server.master.dispatch.enums;
 
-
+/**
+ *  executor type
+ */
 public enum ExecutorType {
 
     WORKER,
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
index d8ca50a..8a441b9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
@@ -17,7 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
 
-
+/**
+ *  execute exception
+ */
 public class ExecuteException extends Exception{
 
     public ExecuteException() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index 65ed15e..e1f0c3c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -20,17 +20,26 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 
-
+/**
+ *  abstract executor manager
+ */
 public abstract class AbstractExecutorManager implements ExecutorManager{
 
+    /**
+     * before execute , add time monitor , timeout
+     * @param context context
+     * @throws ExecuteException
+     */
     @Override
-    public void beforeExecute(ExecutionContext executeContext) throws ExecuteException {
-        //TODO add time monitor
+    public void beforeExecute(ExecutionContext context) throws ExecuteException {
     }
 
+    /**
+     * after execute , add dispatch monitor
+     * @param context context
+     * @throws ExecuteException
+     */
     @Override
-    public void afterExecute(ExecutionContext executeContext) throws ExecuteException {
-        //TODO add dispatch monitor
-
+    public void afterExecute(ExecutionContext context) throws ExecuteException {
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 98d391e..1d78d2f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -20,12 +20,29 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 
-
+/**
+ *  executor manager
+ */
 public interface ExecutorManager {
 
+    /**
+     * before execute
+     * @param executeContext executeContext
+     * @throws ExecuteException
+     */
     void beforeExecute(ExecutionContext executeContext) throws ExecuteException;
 
-    void execute(ExecutionContext executeContext) throws ExecuteException;
+    /**
+     * execute task
+     * @param context context
+     * @throws ExecuteException
+     */
+    void execute(ExecutionContext context) throws ExecuteException;
 
-    void afterExecute(ExecutionContext executeContext) throws ExecuteException;
+    /**
+     * after execute
+     * @param context context
+     * @throws ExecuteException
+     */
+    void afterExecute(ExecutionContext context) throws ExecuteException;
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index e24bbe7..cf1a264 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -42,15 +42,23 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-
+/**
+ *  netty executor manager
+ */
 @Service
 public class NettyExecutorManager extends AbstractExecutorManager{
 
     private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
 
+    /**
+     * zookeeper node manager
+     */
     @Autowired
     private ZookeeperNodeManager zookeeperNodeManager;
 
+    /**
+     * netty remote client
+     */
     private final NettyRemotingClient nettyRemotingClient;
 
     public NettyExecutorManager(){
@@ -60,29 +68,48 @@ public class NettyExecutorManager extends AbstractExecutorManager{
         this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
     }
 
+    /**
+     *  execute logic
+     * @param context context
+     * @throws ExecuteException
+     */
     @Override
-    public void execute(ExecutionContext executeContext) throws ExecuteException {
-        Set<String> allNodes = getAllNodes(executeContext);
+    public void execute(ExecutionContext context) throws ExecuteException {
+
+        /**
+         *  all nodes
+         */
+        Set<String> allNodes = getAllNodes(context);
+
+        /**
+         * fail nodes
+         */
         Set<String> failNodeSet = new HashSet<>();
-        //
-        Command command = buildCommand(executeContext);
-        Host host = executeContext.getHost();
+
+        /**
+         *  build command accord executeContext
+         */
+        Command command = buildCommand(context);
+
+        /**
+         * execute task host
+         */
+        Host host = context.getHost();
         boolean success = false;
-        //
         while (!success) {
             try {
-                doExecute(host, command);
+                doExecute(host,command);
                 success = true;
-                executeContext.setHost(host);
+                context.setHost(host);
             } catch (ExecuteException ex) {
-                logger.error(String.format("execute context : %s error", executeContext.getContext()), ex);
+                logger.error(String.format("execute context : %s error", context.getContext()), ex);
                 try {
                     failNodeSet.add(host.getAddress());
                     Set<String> tmpAllIps = new HashSet<>(allNodes);
                     Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                     if (remained != null && remained.size() > 0) {
                         host = Host.of(remained.iterator().next());
-                        logger.error("retry execute context : {} host : {}", executeContext.getContext(), host);
+                        logger.error("retry execute context : {} host : {}", context.getContext(), host);
                     } else {
                         throw new ExecuteException("fail after try all nodes");
                     }
@@ -93,6 +120,11 @@ public class NettyExecutorManager extends AbstractExecutorManager{
         }
     }
 
+    /**
+     *  build command
+     * @param context context
+     * @return command
+     */
     private Command buildCommand(ExecutionContext context) {
         ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
         ExecutorType executorType = context.getExecutorType();
@@ -110,7 +142,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{
         return requestCommand.convert2Command();
     }
 
+    /**
+     *  execute logic
+     * @param host host
+     * @param command command
+     * @throws ExecuteException
+     */
     private void doExecute(final Host host, final Command command) throws ExecuteException {
+        /**
+         * retry count,default retry 3
+         */
         int retryCount = 3;
         boolean success = false;
         do {
@@ -131,8 +172,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{
         }
     }
 
+    /**
+     *  get all nodes
+     * @param context context
+     * @return nodes
+     */
     private Set<String> getAllNodes(ExecutionContext context){
         Set<String> nodes = Collections.EMPTY_SET;
+        /**
+         * executor type
+         */
         ExecutorType executorType = context.getExecutorType();
         switch (executorType){
             case WORKER:
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
index 8708273..ec65cab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -21,8 +21,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 
+/**
+ *  host manager
+ */
 public interface HostManager {
 
+    /**
+     *  select host
+     * @param context context
+     * @return host
+     */
     Host select(ExecutionContext context);
 
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 1c222b8..3bb001e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -34,24 +34,44 @@ import java.util.Collection;
 import java.util.List;
 
 
+/**
+ *  round robin host manager
+ */
 @Service
 public class RoundRobinHostManager implements HostManager {
 
     private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
 
+    /**
+     * zookeeperNodeManager
+     */
     @Autowired
     private ZookeeperNodeManager zookeeperNodeManager;
 
+    /**
+     * selector
+     */
     private final Selector<Host> selector;
 
+    /**
+     * set round robin
+     */
     public RoundRobinHostManager(){
         this.selector = new RoundRobinSelector<>();
     }
 
+    /**
+     * select host
+     * @param context context
+     * @return host
+     */
     @Override
     public Host select(ExecutionContext context){
         Host host = new Host();
         Collection<String> nodes = null;
+        /**
+         * executor type
+         */
         ExecutorType executorType = context.getExecutorType();
         switch (executorType){
             case WORKER:
@@ -69,6 +89,9 @@ public class RoundRobinHostManager implements HostManager {
         List<Host> candidateHosts = new ArrayList<>(nodes.size());
         nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
 
+        /**
+         * select
+         */
         return selector.select(candidateHosts);
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index cf8c0e8..be52fcb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -20,7 +20,10 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 import java.util.Collection;
 import java.util.Random;
 
-
+/**
+ * random selector
+ * @param <T> T
+ */
 public class RandomSelector<T> implements Selector<T> {
 
     private final Random random = new Random();
@@ -32,11 +35,17 @@ public class RandomSelector<T> implements Selector<T> {
             throw new IllegalArgumentException("Empty source.");
         }
 
+        /**
+         * if only one , return directly
+         */
         if (source.size() == 1) {
             return (T) source.toArray()[0];
         }
 
         int size = source.size();
+        /**
+         *  random select
+         */
         int randomIndex = random.nextInt(size);
 
         return (T) source.toArray()[randomIndex];
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index 90319de..1eb30c8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -21,6 +21,10 @@ import org.springframework.stereotype.Service;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * round robin selector
+ * @param <T> T
+ */
 @Service
 public class RoundRobinSelector<T> implements Selector<T> {
 
@@ -32,11 +36,17 @@ public class RoundRobinSelector<T> implements Selector<T> {
             throw new IllegalArgumentException("Empty source.");
         }
 
+        /**
+         * if only one , return directly
+         */
         if (source.size() == 1) {
             return (T)source.toArray()[0];
         }
 
         int size = source.size();
+        /**
+         * round robin
+         */
         return (T) source.toArray()[index.getAndIncrement() % size];
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index bd7c4ac..0864981 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -20,7 +20,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 import java.util.Collection;
 
 
+/**
+ * selector
+ * @param <T> T
+ */
 public interface Selector<T> {
 
+    /**
+     * select
+     * @param source source
+     * @return T
+     */
     T select(Collection<T> source);
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
index 32fb55f..0c6d740 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
@@ -29,6 +29,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+/**
+ *  task fulture
+ */
 public class TaskFuture {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class);
@@ -139,19 +142,6 @@ public class TaskFuture {
     }
 
 
-    @Override
-    public String toString() {
-        return "ResponseFuture{" +
-                "opaque=" + opaque +
-                ", timeoutMillis=" + timeoutMillis +
-                ", latch=" + latch +
-                ", beginTimestamp=" + beginTimestamp +
-                ", responseCommand=" + responseCommand +
-                ", sendOk=" + sendOk +
-                ", cause=" + cause +
-                '}';
-    }
-
     /**
      * scan future table
      */
@@ -168,4 +158,17 @@ public class TaskFuture {
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return "TaskFuture{" +
+                "opaque=" + opaque +
+                ", timeoutMillis=" + timeoutMillis +
+                ", latch=" + latch +
+                ", beginTimestamp=" + beginTimestamp +
+                ", responseCommand=" + responseCommand +
+                ", sendOk=" + sendOk +
+                ", cause=" + cause +
+                '}';
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index f5f2123..83da3b0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -46,11 +46,19 @@ public class TaskAckProcessor implements NettyRequestProcessor {
         this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
+    /**
+     *  task ack process
+     * @param channel channel channel
+     * @param command command ExecuteTaskAckCommand
+     */
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
         ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
         logger.info("taskAckCommand : {}",taskAckCommand);
+        /**
+         * change Task state
+         */
         processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
                 taskAckCommand.getStartTime(),
                 taskAckCommand.getHost(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index c7a2d0b..1d6808d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -33,37 +33,80 @@ import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-
+/**
+ *  zookeeper node manager
+ */
 @Service
 public class ZookeeperNodeManager implements InitializingBean {
 
     private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
 
+    /**
+     *  master lock
+     */
     private final Lock masterLock = new ReentrantLock();
 
+    /**
+     *  worker lock
+     */
     private final Lock workerLock = new ReentrantLock();
 
+    /**
+     *  worker nodes
+     */
     private final Set<String> workerNodes = new HashSet<>();
 
+    /**
+     *  master nodes
+     */
     private final Set<String> masterNodes = new HashSet<>();
 
+    /**
+     * zookeeper registry center
+     */
     @Autowired
     private ZookeeperRegistryCenter registryCenter;
 
+    /**
+     *  init listener
+     * @throws Exception
+     */
     @Override
     public void afterPropertiesSet() throws Exception {
+        /**
+         *  load nodes from zookeeper
+         */
         load();
+        /**
+         * init MasterNodeListener listener
+         */
         registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
+        /**
+         * init WorkerNodeListener listener
+         */
         registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener());
     }
 
+    /**
+     *  load nodes from zookeeper
+     */
     private void load(){
-        Set<String> schedulerNodes = registryCenter.getMasterNodesDirectly();
-        syncMasterNodes(schedulerNodes);
+        /**
+         * master nodes from zookeeper
+         */
+        Set<String> masterNodes = registryCenter.getMasterNodesDirectly();
+        syncMasterNodes(masterNodes);
+
+        /**
+         * worker nodes from zookeeper
+         */
         Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
         syncWorkerNodes(workersNodes);
     }
 
+    /**
+     *  worker node listener
+     */
     class WorkerNodeListener extends AbstractListener {
 
         @Override
@@ -91,6 +134,9 @@ public class ZookeeperNodeManager implements InitializingBean {
     }
 
 
+    /**
+     *  master node listener
+     */
     class MasterNodeListener extends AbstractListener {
 
         @Override
@@ -115,6 +161,10 @@ public class ZookeeperNodeManager implements InitializingBean {
         }
     }
 
+    /**
+     *  get master nodes
+     * @return master nodes
+     */
     public Set<String> getMasterNodes() {
         masterLock.lock();
         try {
@@ -124,6 +174,10 @@ public class ZookeeperNodeManager implements InitializingBean {
         }
     }
 
+    /**
+     *  sync master nodes
+     * @param nodes master nodes
+     */
     private void syncMasterNodes(Set<String> nodes){
         masterLock.lock();
         try {
@@ -134,6 +188,10 @@ public class ZookeeperNodeManager implements InitializingBean {
         }
     }
 
+    /**
+     *  sync worker nodes
+     * @param nodes worker nodes
+     */
     private void syncWorkerNodes(Set<String> nodes){
         workerLock.lock();
         try {
@@ -144,6 +202,10 @@ public class ZookeeperNodeManager implements InitializingBean {
         }
     }
 
+    /**
+     * get worker nodes
+     * @return worker nodes
+     */
     public Set<String> getWorkerNodes(){
         workerLock.lock();
         try {
@@ -153,6 +215,9 @@ public class ZookeeperNodeManager implements InitializingBean {
         }
     }
 
+    /**
+     *  close
+     */
     public void close(){
         registryCenter.close();
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 3364a94..7d7e2ef 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -27,17 +27,32 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ *  zookeeper register center
+ */
 @Service
 public class ZookeeperRegistryCenter implements InitializingBean {
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
+    /**
+     * namespace
+     */
     public static final String NAMESPACE = "/dolphinscheduler";
 
+    /**
+     * nodes namespace
+     */
     public static final String NODES = NAMESPACE + "/nodes";
 
+    /**
+     * master path
+     */
     public static final String MASTER_PATH = NODES + "/master";
 
+    /**
+     * worker path
+     */
     public static final String WORKER_PATH = NODES + "/worker";
 
     public static final String EMPTY = "";
@@ -50,19 +65,26 @@ public class ZookeeperRegistryCenter implements InitializingBean {
         init();
     }
 
+    /**
+     * init node persist
+     */
     public void init() {
         if (isStarted.compareAndSet(false, true)) {
-            //TODO
-//            zookeeperCachedOperator.start(NODES);
             initNodes();
         }
     }
 
+    /**
+     * init nodes
+     */
     private void initNodes() {
         zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
         zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
     }
 
+    /**
+     * close
+     */
     public void close() {
         if (isStarted.compareAndSet(true, false)) {
             if (zookeeperCachedOperator != null) {
@@ -71,36 +93,71 @@ public class ZookeeperRegistryCenter implements InitializingBean {
         }
     }
 
+    /**
+     * get master path
+     * @return master path
+     */
     public String getMasterPath() {
         return MASTER_PATH;
     }
 
+    /**
+     * get worker path
+     * @return worker path
+     */
     public String getWorkerPath() {
         return WORKER_PATH;
     }
 
+    /**
+     *  get master nodes directly
+     * @return master nodes
+     */
     public Set<String> getMasterNodesDirectly() {
         List<String> masters = getChildrenKeys(MASTER_PATH);
         return new HashSet<>(masters);
     }
 
+    /**
+     *  get worker nodes directly
+     * @return master nodes
+     */
     public Set<String> getWorkerNodesDirectly() {
         List<String> workers = getChildrenKeys(WORKER_PATH);
         return new HashSet<>(workers);
     }
 
+    /**
+     * whether worker path
+     * @param path path
+     * @return result
+     */
     public boolean isWorkerPath(String path) {
         return path != null && path.contains(WORKER_PATH);
     }
 
+    /**
+     * whether master path
+     * @param path path
+     * @return result
+     */
     public boolean isMasterPath(String path) {
         return path != null && path.contains(MASTER_PATH);
     }
 
+    /**
+     * get children nodes
+     * @param key key
+     * @return children nodes
+     */
     public List<String> getChildrenKeys(final String key) {
         return zookeeperCachedOperator.getChildrenKeys(key);
     }
 
+    /**
+     * get zookeeperCachedOperator
+     * @return zookeeperCachedOperator
+     */
     public ZookeeperCachedOperator getZookeeperCachedOperator() {
         return zookeeperCachedOperator;
     }