You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/01 13:42:32 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#2044)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 9d36eaf  Refactor worker (#2044)
9d36eaf is described below

commit 9d36eaf6df627e9fc564a7c57182052ddf754eec
Author: Tboy <gu...@immomo.com>
AuthorDate: Sun Mar 1 21:42:23 2020 +0800

    Refactor worker (#2044)
    
    * Refactor worker (#10)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * Refactor worker (#2018)
    
    * Refactor worker (#7)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * Refactor worker (#8)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * add kill command
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * refactor heartbeat logic
    
    * update registry and add worker group
    
    * add worker group
    
    * add lowerWeight host manager
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
 .../server/master/config/MasterConfig.java         |  11 ++
 .../server/master/dispatch/ExecutorDispatcher.java |  14 +-
 ...obinHostManager.java => CommonHostManager.java} |  36 ++---
 .../master/dispatch/host/HostManagerConfig.java    |  64 ++++++++
 .../dispatch/host/LowerWeightHostManager.java      | 171 +++++++++++++++++++++
 .../master/dispatch/host/RandomHostManager.java    |  48 ++++++
 .../dispatch/host/RoundRobinHostManager.java       |  55 +------
 .../master/dispatch/host/assign/HostSelector.java  |  39 +++++
 .../master/dispatch/host/assign/HostWeight.java    |  73 +++++++++
 .../host/assign/LowerWeightRoundRobin.java         |  46 ++++++
 .../server/registry/ZookeeperNodeManager.java      |   5 +
 .../dispatch/host/LowerWeightRoundRobinTest.java   |  43 ++++++
 12 files changed, 528 insertions(+), 77 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index efb7cff..e8a8ecb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -43,6 +43,17 @@ public class MasterConfig {
     @Value("${master.reserved.memory}")
     private double masterReservedMemory;
 
+    @Value("${master.host.selector:lowerWeight}")
+    private String hostSelector;
+
+    public String getHostSelector() {
+        return hostSelector;
+    }
+
+    public void setHostSelector(String hostSelector) {
+        this.hostSelector = hostSelector;
+    }
+
     public int getMasterExecThreads() {
         return masterExecThreads;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 8a803a2..c597dc1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -20,12 +20,13 @@ package org.apache.dolphinscheduler.server.master.dispatch;
 
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
-import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
+import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -44,14 +45,23 @@ public class ExecutorDispatcher implements InitializingBean {
     @Autowired
     private NettyExecutorManager nettyExecutorManager;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     /**
      * round robin host manager
      */
     @Autowired
-    private RoundRobinHostManager hostManager;
+    private HostManager hostManager;
 
+    /**
+     * executor manager
+     */
     private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
 
+    /**
+     * constructor
+     */
     public ExecutorDispatcher(){
         this.executorManagers = new ConcurrentHashMap<>();
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
similarity index 77%
copy from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
copy to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index a573632..080ce7a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -21,13 +21,10 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,28 +34,15 @@ import java.util.List;
 /**
  *  round robin host manager
  */
-@Service
-public class RoundRobinHostManager implements HostManager {
+public abstract class CommonHostManager implements HostManager {
 
-    private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
+    private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class);
 
     /**
      * zookeeperNodeManager
      */
     @Autowired
-    private ZookeeperNodeManager zookeeperNodeManager;
-
-    /**
-     * selector
-     */
-    private final Selector<Host> selector;
-
-    /**
-     * set round robin
-     */
-    public RoundRobinHostManager(){
-        this.selector = new RoundRobinSelector<>();
-    }
+    protected ZookeeperNodeManager zookeeperNodeManager;
 
     /**
      * select host
@@ -89,10 +73,16 @@ public class RoundRobinHostManager implements HostManager {
         List<Host> candidateHosts = new ArrayList<>(nodes.size());
         nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
 
-        /**
-         * select
-         */
-        return selector.select(candidateHosts);
+        return select(candidateHosts);
+    }
+
+    public abstract Host select(Collection<Host> nodes);
+
+    public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
+        this.zookeeperNodeManager = zookeeperNodeManager;
     }
 
+    public ZookeeperNodeManager getZookeeperNodeManager() {
+        return zookeeperNodeManager;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java
new file mode 100644
index 0000000..458a1ee
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * host manager config
+ */
+@Configuration
+public class HostManagerConfig {
+
+    private AutowireCapableBeanFactory beanFactory;
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Autowired
+    public HostManagerConfig(AutowireCapableBeanFactory beanFactory) {
+        this.beanFactory = beanFactory;
+    }
+
+    @Bean
+    public HostManager hostManager() {
+        String hostSelector = masterConfig.getHostSelector();
+        HostSelector selector = HostSelector.of(hostSelector);
+        HostManager hostManager;
+        switch (selector){
+            case RANDOM:
+                hostManager = new RandomHostManager();
+                break;
+            case ROUNDROBIN:
+                hostManager = new RoundRobinHostManager();
+                break;
+            case LOWERWEIGHT:
+                hostManager = new LowerWeightHostManager();
+                break;
+            default:
+                throw new IllegalArgumentException("unSupport selector " + hostSelector);
+        }
+        beanFactory.autowireBean(hostManager);
+        return hostManager;
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
new file mode 100644
index 0000000..99cae69
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+
+
+/**
+ *  round robin host manager
+ */
+public class LowerWeightHostManager extends CommonHostManager {
+
+    private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class);
+
+    /**
+     * zookeeper registry center
+     */
+    @Autowired
+    private ZookeeperRegistryCenter registryCenter;
+
+    /**
+     * round robin host manager
+     */
+    private RoundRobinHostManager roundRobinHostManager;
+
+    /**
+     * selector
+     */
+    private LowerWeightRoundRobin selector;
+
+    /**
+     * worker host weights
+     */
+    private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
+
+    /**
+     * worker group host lock
+     */
+    private Lock lock;
+
+    /**
+     * executor service
+     */
+    private ScheduledExecutorService executorService;
+
+    @PostConstruct
+    public void init(){
+        this.selector = new LowerWeightRoundRobin();
+        this.workerHostWeights = new ConcurrentHashMap<>();
+        this.lock = new ReentrantLock();
+        this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
+        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
+        this.roundRobinHostManager = new RoundRobinHostManager();
+        this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
+    }
+
+    @PreDestroy
+    public void close(){
+        this.executorService.shutdownNow();
+    }
+
+    /**
+     * select host
+     * @param context context
+     * @return host
+     */
+    @Override
+    public Host select(ExecutionContext context){
+        Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
+        if(CollectionUtils.isNotEmpty(workerHostWeights)){
+            return selector.select(workerHostWeights).getHost();
+        } else{
+            return roundRobinHostManager.select(context);
+        }
+    }
+
+    @Override
+    public Host select(Collection<Host> nodes) {
+        throw new UnsupportedOperationException("not support");
+    }
+
+    private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
+        lock.lock();
+        try {
+            workerHostWeights.clear();
+            workerHostWeights.putAll(workerHostWeights);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private Set<HostWeight> getWorkerHostWeights(String workerGroup){
+        lock.lock();
+        try {
+            return workerHostWeights.get(workerGroup);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    class RefreshResourceTask implements Runnable{
+
+        @Override
+        public void run() {
+            try {
+                Map<String, Set<String>> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes();
+                Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
+                Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
+                for(Map.Entry<String, Set<String>> entry : entries){
+                    String workerGroup = entry.getKey();
+                    Set<String> nodes = entry.getValue();
+                    String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
+                    Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+                    for(String node : nodes){
+                        String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
+                        if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){
+                            String[] parts = heartbeat.split(COMMA);
+                            double cpu = Double.parseDouble(parts[0]);
+                            double memory = Double.parseDouble(parts[1]);
+                            double loadAverage = Double.parseDouble(parts[2]);
+                            HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage);
+                            hostWeights.add(hostWeight);
+                        }
+                    }
+                    workerHostWeights.put(workerGroup, hostWeights);
+                }
+                syncWorkerHostWeight(workerHostWeights);
+            } catch (Throwable ex){
+                logger.error("RefreshResourceTask error", ex);
+            }
+        }
+    }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
new file mode 100644
index 0000000..ef2b6fd
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
+
+import java.util.Collection;
+
+
+/**
+ *  round robin host manager
+ */
+public class RandomHostManager extends CommonHostManager {
+
+    /**
+     * selector
+     */
+    private final Selector<Host> selector;
+
+    /**
+     * set round robin
+     */
+    public RandomHostManager(){
+        this.selector = new RandomSelector<>();
+    }
+
+    @Override
+    public Host select(Collection<Host> nodes) {
+        return selector.select(nodes);
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index a573632..e9fef49 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -17,36 +17,17 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
-import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 
 
 /**
  *  round robin host manager
  */
-@Service
-public class RoundRobinHostManager implements HostManager {
-
-    private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
-
-    /**
-     * zookeeperNodeManager
-     */
-    @Autowired
-    private ZookeeperNodeManager zookeeperNodeManager;
+public class RoundRobinHostManager extends CommonHostManager {
 
     /**
      * selector
@@ -60,39 +41,9 @@ public class RoundRobinHostManager implements HostManager {
         this.selector = new RoundRobinSelector<>();
     }
 
-    /**
-     * select host
-     * @param context context
-     * @return host
-     */
     @Override
-    public Host select(ExecutionContext context){
-        Host host = new Host();
-        Collection<String> nodes = null;
-        /**
-         * executor type
-         */
-        ExecutorType executorType = context.getExecutorType();
-        switch (executorType){
-            case WORKER:
-                nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
-                break;
-            case CLIENT:
-                break;
-            default:
-                throw new IllegalArgumentException("invalid executorType : " + executorType);
-
-        }
-        if(CollectionUtils.isEmpty(nodes)){
-            return host;
-        }
-        List<Host> candidateHosts = new ArrayList<>(nodes.size());
-        nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
-
-        /**
-         * select
-         */
-        return selector.select(candidateHosts);
+    public Host select(Collection<Host> nodes) {
+        return selector.select(nodes);
     }
 
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
new file mode 100644
index 0000000..145393e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+/**
+ * host selector
+ */
+public enum HostSelector {
+
+    RANDOM,
+
+    ROUNDROBIN,
+
+    LOWERWEIGHT;
+
+    public static HostSelector of(String selector){
+        for(HostSelector hs : values()){
+            if(hs.name().equalsIgnoreCase(selector)){
+                return hs;
+            }
+        }
+        throw new IllegalArgumentException("invalid host selector : " + selector);
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
new file mode 100644
index 0000000..ebceea7
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+/**
+ * host weight
+ */
+public class HostWeight {
+
+    private final int CPU_FACTOR = 10;
+
+    private final int MEMORY_FACTOR = 20;
+
+    private final int LOAD_AVERAGE_FACTOR = 70;
+
+    private final Host host;
+
+    private final int weight;
+
+    private int currentWeight;
+
+    public HostWeight(Host host, double cpu, double memory, double loadAverage) {
+        this.weight = calculateWeight(cpu, memory, loadAverage);
+        this.host = host ;
+        this.currentWeight = weight ;
+    }
+
+    public int getCurrentWeight() {
+        return currentWeight;
+    }
+
+    public int getWeight() {
+        return weight;
+    }
+
+    public void setCurrentWeight(int currentWeight) {
+        this.currentWeight = currentWeight;
+    }
+
+    public Host getHost() {
+        return host;
+    }
+
+    @Override
+    public String toString() {
+        return "HostWeight{" +
+                "host=" + host +
+                ", weight=" + weight +
+                ", currentWeight=" + currentWeight +
+                '}';
+    }
+
+    private int calculateWeight(double cpu, double memory, double loadAverage){
+        return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
new file mode 100644
index 0000000..cadf418
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import java.util.Collection;
+
+/**
+ *  lower weight round robin
+ */
+public class LowerWeightRoundRobin implements Selector<HostWeight>{
+
+    public HostWeight select(Collection<HostWeight> sources){
+        int totalWeight = 0;
+        int lowWeight = 0;
+        HostWeight lowerNode = null;
+        for (HostWeight hostWeight : sources) {
+            totalWeight += hostWeight.getWeight();
+            hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
+            if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) {
+                lowerNode = hostWeight;
+                lowWeight = hostWeight.getCurrentWeight();
+            }
+        }
+        lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
+        return lowerNode;
+
+    }
+}
+
+
+
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 590a25f..25355e2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -31,6 +31,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
@@ -227,6 +228,10 @@ public class ZookeeperNodeManager implements InitializingBean {
         }
     }
 
+    public Map<String, Set<String>> getWorkerGroupNodes(){
+        return Collections.unmodifiableMap(workerGroupNodes);
+    }
+
     /**
      * get worker group nodes
      * @param workerGroup
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
new file mode 100644
index 0000000..10936a6
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class LowerWeightRoundRobinTest {
+
+
+    @Test
+    public void testSelect(){
+        Collection<HostWeight> sources = new ArrayList<>();
+        sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
+        sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
+        sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
+        System.out.println(sources);
+        LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
+        for(int i = 0; i < 100; i ++){
+            System.out.println(roundRobin.select(sources));
+        }
+    }
+}