You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/01 13:42:32 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
Refactor worker (#2044)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 9d36eaf Refactor worker (#2044)
9d36eaf is described below
commit 9d36eaf6df627e9fc564a7c57182052ddf754eec
Author: Tboy <gu...@immomo.com>
AuthorDate: Sun Mar 1 21:42:23 2020 +0800
Refactor worker (#2044)
* Refactor worker (#10)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* Refactor worker (#2018)
* Refactor worker (#7)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* Refactor worker (#8)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* add kill command
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* refactor heartbeat logic
* update registry and add worker group
* add worker group
* add lowerWeight host manager
Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
.../server/master/config/MasterConfig.java | 11 ++
.../server/master/dispatch/ExecutorDispatcher.java | 14 +-
...obinHostManager.java => CommonHostManager.java} | 36 ++---
.../master/dispatch/host/HostManagerConfig.java | 64 ++++++++
.../dispatch/host/LowerWeightHostManager.java | 171 +++++++++++++++++++++
.../master/dispatch/host/RandomHostManager.java | 48 ++++++
.../dispatch/host/RoundRobinHostManager.java | 55 +------
.../master/dispatch/host/assign/HostSelector.java | 39 +++++
.../master/dispatch/host/assign/HostWeight.java | 73 +++++++++
.../host/assign/LowerWeightRoundRobin.java | 46 ++++++
.../server/registry/ZookeeperNodeManager.java | 5 +
.../dispatch/host/LowerWeightRoundRobinTest.java | 43 ++++++
12 files changed, 528 insertions(+), 77 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index efb7cff..e8a8ecb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -43,6 +43,17 @@ public class MasterConfig {
@Value("${master.reserved.memory}")
private double masterReservedMemory;
+ @Value("${master.host.selector:lowerWeight}")
+ private String hostSelector;
+
+ public String getHostSelector() {
+ return hostSelector;
+ }
+
+ public void setHostSelector(String hostSelector) {
+ this.hostSelector = hostSelector;
+ }
+
public int getMasterExecThreads() {
return masterExecThreads;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 8a803a2..c597dc1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -20,12 +20,13 @@ package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
-import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
+import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -44,14 +45,23 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired
private NettyExecutorManager nettyExecutorManager;
+ @Autowired
+ private MasterConfig masterConfig;
+
/**
* round robin host manager
*/
@Autowired
- private RoundRobinHostManager hostManager;
+ private HostManager hostManager;
+ /**
+ * executor manager
+ */
private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
+ /**
+ * constructor
+ */
public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>();
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
similarity index 77%
copy from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
copy to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index a573632..080ce7a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -21,13 +21,10 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,28 +34,15 @@ import java.util.List;
/**
* round robin host manager
*/
-@Service
-public class RoundRobinHostManager implements HostManager {
+public abstract class CommonHostManager implements HostManager {
- private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
+ private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class);
/**
* zookeeperNodeManager
*/
@Autowired
- private ZookeeperNodeManager zookeeperNodeManager;
-
- /**
- * selector
- */
- private final Selector<Host> selector;
-
- /**
- * set round robin
- */
- public RoundRobinHostManager(){
- this.selector = new RoundRobinSelector<>();
- }
+ protected ZookeeperNodeManager zookeeperNodeManager;
/**
* select host
@@ -89,10 +73,16 @@ public class RoundRobinHostManager implements HostManager {
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
- /**
- * select
- */
- return selector.select(candidateHosts);
+ return select(candidateHosts);
+ }
+
+ public abstract Host select(Collection<Host> nodes);
+
+ public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
+ this.zookeeperNodeManager = zookeeperNodeManager;
}
+ public ZookeeperNodeManager getZookeeperNodeManager() {
+ return zookeeperNodeManager;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java
new file mode 100644
index 0000000..458a1ee
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * host manager config
+ */
+@Configuration
+public class HostManagerConfig {
+
+ private AutowireCapableBeanFactory beanFactory;
+
+ @Autowired
+ private MasterConfig masterConfig;
+
+ @Autowired
+ public HostManagerConfig(AutowireCapableBeanFactory beanFactory) {
+ this.beanFactory = beanFactory;
+ }
+
+ @Bean
+ public HostManager hostManager() {
+ String hostSelector = masterConfig.getHostSelector();
+ HostSelector selector = HostSelector.of(hostSelector);
+ HostManager hostManager;
+ switch (selector){
+ case RANDOM:
+ hostManager = new RandomHostManager();
+ break;
+ case ROUNDROBIN:
+ hostManager = new RoundRobinHostManager();
+ break;
+ case LOWERWEIGHT:
+ hostManager = new LowerWeightHostManager();
+ break;
+ default:
+ throw new IllegalArgumentException("unSupport selector " + hostSelector);
+ }
+ beanFactory.autowireBean(hostManager);
+ return hostManager;
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
new file mode 100644
index 0000000..99cae69
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+
+
+/**
+ * round robin host manager
+ */
+public class LowerWeightHostManager extends CommonHostManager {
+
+ private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class);
+
+ /**
+ * zookeeper registry center
+ */
+ @Autowired
+ private ZookeeperRegistryCenter registryCenter;
+
+ /**
+ * round robin host manager
+ */
+ private RoundRobinHostManager roundRobinHostManager;
+
+ /**
+ * selector
+ */
+ private LowerWeightRoundRobin selector;
+
+ /**
+ * worker host weights
+ */
+ private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
+
+ /**
+ * worker group host lock
+ */
+ private Lock lock;
+
+ /**
+ * executor service
+ */
+ private ScheduledExecutorService executorService;
+
+ @PostConstruct
+ public void init(){
+ this.selector = new LowerWeightRoundRobin();
+ this.workerHostWeights = new ConcurrentHashMap<>();
+ this.lock = new ReentrantLock();
+ this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
+ this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
+ this.roundRobinHostManager = new RoundRobinHostManager();
+ this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
+ }
+
+ @PreDestroy
+ public void close(){
+ this.executorService.shutdownNow();
+ }
+
+ /**
+ * select host
+ * @param context context
+ * @return host
+ */
+ @Override
+ public Host select(ExecutionContext context){
+ Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
+ if(CollectionUtils.isNotEmpty(workerHostWeights)){
+ return selector.select(workerHostWeights).getHost();
+ } else{
+ return roundRobinHostManager.select(context);
+ }
+ }
+
+ @Override
+ public Host select(Collection<Host> nodes) {
+ throw new UnsupportedOperationException("not support");
+ }
+
+ private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
+ lock.lock();
+ try {
+ workerHostWeights.clear();
+ workerHostWeights.putAll(workerHostWeights);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Set<HostWeight> getWorkerHostWeights(String workerGroup){
+ lock.lock();
+ try {
+ return workerHostWeights.get(workerGroup);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ class RefreshResourceTask implements Runnable{
+
+ @Override
+ public void run() {
+ try {
+ Map<String, Set<String>> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes();
+ Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
+ Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
+ for(Map.Entry<String, Set<String>> entry : entries){
+ String workerGroup = entry.getKey();
+ Set<String> nodes = entry.getValue();
+ String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
+ Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+ for(String node : nodes){
+ String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
+ if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){
+ String[] parts = heartbeat.split(COMMA);
+ double cpu = Double.parseDouble(parts[0]);
+ double memory = Double.parseDouble(parts[1]);
+ double loadAverage = Double.parseDouble(parts[2]);
+ HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage);
+ hostWeights.add(hostWeight);
+ }
+ }
+ workerHostWeights.put(workerGroup, hostWeights);
+ }
+ syncWorkerHostWeight(workerHostWeights);
+ } catch (Throwable ex){
+ logger.error("RefreshResourceTask error", ex);
+ }
+ }
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
new file mode 100644
index 0000000..ef2b6fd
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
+
+import java.util.Collection;
+
+
+/**
+ * round robin host manager
+ */
+public class RandomHostManager extends CommonHostManager {
+
+ /**
+ * selector
+ */
+ private final Selector<Host> selector;
+
+ /**
+ * set round robin
+ */
+ public RandomHostManager(){
+ this.selector = new RandomSelector<>();
+ }
+
+ @Override
+ public Host select(Collection<Host> nodes) {
+ return selector.select(nodes);
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index a573632..e9fef49 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -17,36 +17,17 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
-import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
/**
* round robin host manager
*/
-@Service
-public class RoundRobinHostManager implements HostManager {
-
- private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
-
- /**
- * zookeeperNodeManager
- */
- @Autowired
- private ZookeeperNodeManager zookeeperNodeManager;
+public class RoundRobinHostManager extends CommonHostManager {
/**
* selector
@@ -60,39 +41,9 @@ public class RoundRobinHostManager implements HostManager {
this.selector = new RoundRobinSelector<>();
}
- /**
- * select host
- * @param context context
- * @return host
- */
@Override
- public Host select(ExecutionContext context){
- Host host = new Host();
- Collection<String> nodes = null;
- /**
- * executor type
- */
- ExecutorType executorType = context.getExecutorType();
- switch (executorType){
- case WORKER:
- nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
- break;
- case CLIENT:
- break;
- default:
- throw new IllegalArgumentException("invalid executorType : " + executorType);
-
- }
- if(CollectionUtils.isEmpty(nodes)){
- return host;
- }
- List<Host> candidateHosts = new ArrayList<>(nodes.size());
- nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
-
- /**
- * select
- */
- return selector.select(candidateHosts);
+ public Host select(Collection<Host> nodes) {
+ return selector.select(nodes);
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
new file mode 100644
index 0000000..145393e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+/**
+ * host selector
+ */
+public enum HostSelector {
+
+ RANDOM,
+
+ ROUNDROBIN,
+
+ LOWERWEIGHT;
+
+ public static HostSelector of(String selector){
+ for(HostSelector hs : values()){
+ if(hs.name().equalsIgnoreCase(selector)){
+ return hs;
+ }
+ }
+ throw new IllegalArgumentException("invalid host selector : " + selector);
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
new file mode 100644
index 0000000..ebceea7
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+/**
+ * host weight
+ */
+public class HostWeight {
+
+ private final int CPU_FACTOR = 10;
+
+ private final int MEMORY_FACTOR = 20;
+
+ private final int LOAD_AVERAGE_FACTOR = 70;
+
+ private final Host host;
+
+ private final int weight;
+
+ private int currentWeight;
+
+ public HostWeight(Host host, double cpu, double memory, double loadAverage) {
+ this.weight = calculateWeight(cpu, memory, loadAverage);
+ this.host = host ;
+ this.currentWeight = weight ;
+ }
+
+ public int getCurrentWeight() {
+ return currentWeight;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ public void setCurrentWeight(int currentWeight) {
+ this.currentWeight = currentWeight;
+ }
+
+ public Host getHost() {
+ return host;
+ }
+
+ @Override
+ public String toString() {
+ return "HostWeight{" +
+ "host=" + host +
+ ", weight=" + weight +
+ ", currentWeight=" + currentWeight +
+ '}';
+ }
+
+ private int calculateWeight(double cpu, double memory, double loadAverage){
+ return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
new file mode 100644
index 0000000..cadf418
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import java.util.Collection;
+
+/**
+ * lower weight round robin
+ */
+public class LowerWeightRoundRobin implements Selector<HostWeight>{
+
+ public HostWeight select(Collection<HostWeight> sources){
+ int totalWeight = 0;
+ int lowWeight = 0;
+ HostWeight lowerNode = null;
+ for (HostWeight hostWeight : sources) {
+ totalWeight += hostWeight.getWeight();
+ hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
+ if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) {
+ lowerNode = hostWeight;
+ lowWeight = hostWeight.getCurrentWeight();
+ }
+ }
+ lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
+ return lowerNode;
+
+ }
+}
+
+
+
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 590a25f..25355e2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -31,6 +31,7 @@ import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
@@ -227,6 +228,10 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
+ public Map<String, Set<String>> getWorkerGroupNodes(){
+ return Collections.unmodifiableMap(workerGroupNodes);
+ }
+
/**
* get worker group nodes
* @param workerGroup
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
new file mode 100644
index 0000000..10936a6
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class LowerWeightRoundRobinTest {
+
+
+ @Test
+ public void testSelect(){
+ Collection<HostWeight> sources = new ArrayList<>();
+ sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
+ sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
+ sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
+ System.out.println(sources);
+ LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
+ for(int i = 0; i < 100; i ++){
+ System.out.println(roundRobin.select(sources));
+ }
+ }
+}