You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/19 02:11:52 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#1975)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new f9500c5  Refactor worker (#1975)
f9500c5 is described below

commit f9500c58b174f7dbf1c3d073046583157c7d6673
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Feb 19 10:11:41 2020 +0800

    Refactor worker (#1975)
    
    * updates
    
    * move FetchTaskThread logic to WorkerNettyRequestProcessor
    
    * add NettyRemotingClient to scheduler thread
---
 .../remote/command/ExecuteTaskRequestCommand.java  |   2 +-
 .../dolphinscheduler/remote/utils/Constants.java   |   3 +
 .../dolphinscheduler/remote/utils/IPUtils.java     | 142 +++++++++++++++++++++
 .../server/master/runner/MasterExecThread.java     |  28 +++-
 .../master/runner/MasterSchedulerThread.java       |  13 +-
 .../server/registry/ZookeeperRegistryCenter.java   |  83 ++++++++++++
 .../server/worker/WorkerServer.java                |  49 ++++++-
 .../processor/WorkerNettyRequestProcessor.java     | 106 +++++++++++++++
 .../server/worker/registry/WorkerRegistry.java     |  77 +++++++++++
 9 files changed, 494 insertions(+), 9 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index beec055..adfcb1d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.List;
import java.util.concu
 rrent.atomic.AtomicLong;

/**
 *  execute task request command
 */
public class ExecuteTaskRequestCommand implements Serializable {

    /**
     *  task id
     */
    private String taskId;

    /**
     *  attempt id
     */
    private String attemptId;

    /**
     *  application name
     */
    private String applicationName;

    /**
     *  group name
     */
    private String groupName;

    /**
     *  task name
     */
    private String taskName;

    /**
     *  connector port
     */
    private int connectorPort;

    /**
     *  description info
     */
    private String description;

    /**
     *  class name
     */
    private String className;

    /**
     *  method name
     */
    private String methodName;

    /**
     *  parameters
     */
    private String params;

    /**
     *  shard itemds
     */
    private List<Integer> shardItems;

    public List<Integer> getShardItems() {
        return shardItems;
    }

    public void setShardItems(List<
 Integer> shardItems) {
        this.shardItems = shardItems;
    }

    public String getParams() {
        return params;
    }

    public void setParams(String params) {
        this.params = params;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public String getApplicationName() {
        return applicationName;
    }

    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public int getConnectorPort() {
        return connectorPort;
    }

    public void setConnectorPort(int connectorPort) {
        
 this.connectorPort = connectorPort;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class ExecuteTaskRequestCommand implements Serializable {


    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 5733b17..99fbb94 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -38,4 +38,7 @@ public class Constants {
      */
     public static final int CPUS = Runtime.getRuntime().availableProcessors();
 
+
+    public static final String LOCAL_ADDRESS = IPUtils.getFirstNoLoopbackIP4Address();
+
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java
new file mode 100644
index 0000000..2fa82fd
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class IPUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);
+
+    private static String IP_REGEX = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}";
+
+    private static String LOCAL_HOST = "unknown";
+
+    static {
+        String host = System.getenv("HOSTNAME");
+        if (isNotEmpty(host)) {
+            LOCAL_HOST = host;
+        } else {
+
+            try {
+                String hostName = InetAddress.getLocalHost().getHostName();
+                if (isNotEmpty(hostName)) {
+                    LOCAL_HOST = hostName;
+                }
+            } catch (UnknownHostException e) {
+                logger.error("get hostName error!", e);
+            }
+        }
+    }
+
+    public static String getLocalHost() {
+        return LOCAL_HOST;
+    }
+
+
+    public static String getFirstNoLoopbackIP4Address() {
+        Collection<String> allNoLoopbackIP4Addresses = getNoLoopbackIP4Addresses();
+        if (allNoLoopbackIP4Addresses.isEmpty()) {
+            return null;
+        }
+        return allNoLoopbackIP4Addresses.iterator().next();
+    }
+
+    public static Collection<String> getNoLoopbackIP4Addresses() {
+        Collection<String> noLoopbackIP4Addresses = new ArrayList<>();
+        Collection<InetAddress> allInetAddresses = getAllHostAddress();
+
+        for (InetAddress address : allInetAddresses) {
+            if (!address.isLoopbackAddress() && !address.isSiteLocalAddress()
+                    && !Inet6Address.class.isInstance(address)) {
+                noLoopbackIP4Addresses.add(address.getHostAddress());
+            }
+        }
+        if (noLoopbackIP4Addresses.isEmpty()) {
+            for (InetAddress address : allInetAddresses) {
+                if (!address.isLoopbackAddress() && !Inet6Address.class.isInstance(address)) {
+                    noLoopbackIP4Addresses.add(address.getHostAddress());
+                }
+            }
+        }
+        return noLoopbackIP4Addresses;
+    }
+
+    public static Collection<InetAddress> getAllHostAddress() {
+        try {
+            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
+            Collection<InetAddress> addresses = new ArrayList<>();
+
+            while (networkInterfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = networkInterfaces.nextElement();
+                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+                while (inetAddresses.hasMoreElements()) {
+                    InetAddress inetAddress = inetAddresses.nextElement();
+                    addresses.add(inetAddress);
+                }
+            }
+
+            return addresses;
+        } catch (SocketException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public static String getIpByHostName(String host) {
+        InetAddress address = null;
+        try {
+            address = InetAddress.getByName(host);
+        } catch (UnknownHostException e) {
+            logger.error("get IP error", e);
+        }
+        if (address == null) {
+            return "";
+        }
+        return address.getHostAddress();
+
+    }
+
+    private static boolean isEmpty(final CharSequence cs) {
+        return cs == null || cs.length() == 0;
+    }
+
+    private static boolean isNotEmpty(final CharSequence cs) {
+        return !isEmpty(cs);
+    }
+
+    public static boolean isIp(String addr) {
+        if (addr.length() < 7 || addr.length() > 15 || "".equals(addr)) {
+            return false;
+        }
+
+        Pattern pat = Pattern.compile(IP_REGEX);
+
+        Matcher mat = pat.matcher(addr);
+
+        boolean ipAddress = mat.find();
+
+        return ipAddress;
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index f5e3121..be83217 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -32,6 +32,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Address;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.AlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -135,11 +140,16 @@ public class MasterExecThread implements Runnable {
     private MasterConfig masterConfig;
 
     /**
+     *
+     */
+    private NettyRemotingClient nettyRemotingClient;
+
+    /**
      * constructor of MasterExecThread
      * @param processInstance   process instance
      * @param processService        process dao
      */
-    public MasterExecThread(ProcessInstance processInstance, ProcessService processService){
+    public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
         this.processService = processService;
 
         this.processInstance = processInstance;
@@ -147,6 +157,22 @@ public class MasterExecThread implements Runnable {
         int masterTaskExecNum = masterConfig.getMasterExecTaskNum();
         this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
                 masterTaskExecNum);
+        this.nettyRemotingClient = nettyRemotingClient;
+    }
+
+    //TODO
+    /**端口,默认是123456
+     * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。
+     */
+    private void sendToWorker(){
+        final Address address = new Address("localhost", 12346);
+        ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
+        try {
+            Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
+            //结果可能为空,所以不用管,能发过去,就行。
+        } catch (InterruptedException | RemotingException ex) {
+            logger.error(String.format("send command to : %s error", address), ex);
+        }
     }
 
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
index c0ddb1c..6e96164 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
@@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -70,6 +72,11 @@ public class MasterSchedulerThread implements Runnable {
      */
     private MasterConfig masterConfig;
 
+    /**
+     *  netty remoting client
+     */
+    private NettyRemotingClient nettyRemotingClient;
+
 
     /**
      * constructor of MasterSchedulerThread
@@ -83,6 +90,9 @@ public class MasterSchedulerThread implements Runnable {
         this.masterExecThreadNum = masterExecThreadNum;
         this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
         this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+        //
+        NettyClientConfig clientConfig = new NettyClientConfig();
+        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
     }
 
     /**
@@ -123,7 +133,7 @@ public class MasterSchedulerThread implements Runnable {
                             processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
                             if (processInstance != null) {
                                 logger.info("start master exec thread , split DAG ...");
-                                masterExecService.execute(new MasterExecThread(processInstance, processService));
+                                masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));
                             }
                         }catch (Exception e){
                             logger.error("scan command error ", e);
@@ -140,6 +150,7 @@ public class MasterSchedulerThread implements Runnable {
                 AbstractZKClient.releaseMutex(mutex);
             }
         }
+        nettyRemotingClient.close();
         logger.info("master server stopped...");
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
new file mode 100644
index 0000000..68c19ea
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Service
+public class ZookeeperRegistryCenter implements InitializingBean {
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    public static final String NAMESPACE = "/dolphinscheduler";
+
+    public static final String NODES = NAMESPACE + "/nodes";
+
+    public static final String MASTER_PATH = NODES + "/master";
+
+    public static final String WORKER_PATH = NODES + "/worker";
+
+    public static final String EMPTY = "";
+
+    @Autowired
+    protected ZookeeperCachedOperator zookeeperCachedOperator;
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+
+    }
+
+    public void init() {
+        if (isStarted.compareAndSet(false, true)) {
+            //TODO
+//            zookeeperCachedOperator.start(NODES);
+            initNodes();
+        }
+    }
+
+    private void initNodes() {
+        zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
+        zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
+    }
+
+    public void close() {
+        if (isStarted.compareAndSet(true, false)) {
+            if (zookeeperCachedOperator != null) {
+                zookeeperCachedOperator.close();
+            }
+        }
+    }
+
+    public String getMasterPath() {
+        return MASTER_PATH;
+    }
+
+    public String getWorkerPath() {
+        return WORKER_PATH;
+    }
+
+    public ZookeeperCachedOperator getZookeeperCachedOperator() {
+        return zookeeperCachedOperator;
+    }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index ace9307..d014f1e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,8 +29,14 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
 import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -112,10 +118,29 @@ public class WorkerServer implements IStoppable {
     @Value("${server.is-combined-server:false}")
     private Boolean isCombinedServer;
 
+    /**
+     *  worker config
+     */
     @Autowired
     private WorkerConfig workerConfig;
 
     /**
+     *  zookeeper registry center
+     */
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    /**
+     *  netty remote server
+     */
+    private NettyRemotingServer nettyRemotingServer;
+
+    /**
+     *  worker registry
+     */
+    private WorkerRegistry workerRegistry;
+
+    /**
      *  spring application context
      *  only use it for initialization
      */
@@ -141,7 +166,17 @@ public class WorkerServer implements IStoppable {
     public void run(){
         logger.info("start worker server...");
 
-        zkWorkerClient.init();
+        //init remoting server
+        NettyServerConfig serverConfig = new NettyServerConfig();
+        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService));
+        this.nettyRemotingServer.start();
+
+        //worker registry
+        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
+        this.workerRegistry.registry();
+
+        this.zkWorkerClient.init();
 
         this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
 
@@ -167,10 +202,10 @@ public class WorkerServer implements IStoppable {
         killExecutorService.execute(killProcessThread);
 
         // new fetch task thread
-        FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
-
-        // submit fetch task thread
-        fetchTaskExecutorService.execute(fetchTaskThread);
+//        FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
+//
+//        // submit fetch task thread
+//        fetchTaskExecutorService.execute(fetchTaskThread);
 
         /**
          * register hooks, which are called before the process exits
@@ -217,6 +252,9 @@ public class WorkerServer implements IStoppable {
                 logger.warn("thread sleep exception", e);
             }
 
+            this.nettyRemotingServer.close();
+            this.workerRegistry.unRegistry();
+
             try {
                 heartbeatWorkerService.shutdownNow();
             }catch (Exception e){
@@ -260,7 +298,6 @@ public class WorkerServer implements IStoppable {
         }
     }
 
-
     /**
      * heartbeat thread implement
      *
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
new file mode 100644
index 0000000..c0db034
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.concurrent.ExecutorService;
+
+
+public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class);
+
+    private final ProcessService processService;
+
+    private final ExecutorService workerExecService;
+
+    private final WorkerConfig workerConfig;
+
+    public WorkerNettyRequestProcessor(ProcessService processService){
+        this.processService = processService;
+        this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
+        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
+    }
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        logger.debug("received command : {}", command);
+        TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
+        int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+        Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
+        // verify tenant is null
+        if (verifyTenantIsNull(tenant, taskInstance)) {
+            processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
+            return;
+        }
+        // set queue for process instance, user-specified queue takes precedence over tenant queue
+        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
+        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+        // local execute path
+        String execLocalPath = getExecLocalPath(taskInstance);
+        logger.info("task instance  local execute path : {} ", execLocalPath);
+        // init task
+        taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
+        try {
+            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode());
+        } catch (Exception ex){
+            logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
+        }
+        // submit task
+        workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
+    }
+
+    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+        if(tenant == null){
+            logger.error("tenant not exists,process instance id : {},task instance id : {}",
+                    taskInstance.getProcessInstance().getId(),
+                    taskInstance.getId());
+            return true;
+        }
+        return false;
+    }
+
+    private String getExecLocalPath(TaskInstance taskInstance){
+        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getProcessInstance().getId(),
+                taskInstance.getId());
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
new file mode 100644
index 0000000..396b009
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class WorkerRegistry {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
+
+    private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+    private final int port;
+
+    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
+        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+        this.port = port;
+    }
+
+    public void registry() {
+        String address = Constants.LOCAL_ADDRESS;
+        String localNodePath = getWorkerPath();
+        zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
+        zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                if(newState == ConnectionState.LOST){
+                    logger.error("worker : {} connection lost from zookeeper", address);
+                } else if(newState == ConnectionState.RECONNECTED){
+                    logger.info("worker : {} reconnected to zookeeper", address);
+                    zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
+                } else if(newState == ConnectionState.SUSPENDED){
+                    logger.warn("worker : {} connection SUSPENDED ", address);
+                }
+            }
+        });
+        logger.info("scheduler node : {} registry to ZK successfully.", address);
+    }
+
+    public void unRegistry() {
+        String address = getLocalAddress();
+        String localNodePath = getWorkerPath();
+        zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+        logger.info("worker node : {} unRegistry to ZK.", address);
+    }
+
+    private String getWorkerPath() {
+        String address = getLocalAddress();
+        String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
+        return localNodePath;
+    }
+
+    private String getLocalAddress(){
+        return Constants.LOCAL_ADDRESS + ":" + port;
+    }
+}