You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:36 UTC
[dolphinscheduler] 09/29: Remove the schedule thread in LowerWeightHostManager (#10310)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 9a28d320570e9c6768382584a97ab9d2d9225d96
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jun 21 13:30:07 2022 +0800
Remove the schedule thread in LowerWeightHostManager (#10310)
(cherry picked from commit b100f6c4890fb8db8887e24891c6af963c582360)
---
.../dispatch/host/LowerWeightHostManager.java | 136 ++++++++++-----------
.../server/master/registry/ServerNodeManager.java | 22 ++++
.../master/registry/WorkerInfoChangeListener.java} | 37 ++----
3 files changed, 100 insertions(+), 95 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 646d770b01..f7c26ae765 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
+import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -36,14 +36,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,23 +66,12 @@ public class LowerWeightHostManager extends CommonHostManager {
*/
private Lock lock;
- /**
- * executor service
- */
- private ScheduledExecutorService executorService;
-
@PostConstruct
public void init() {
this.selector = new LowerWeightRoundRobin();
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
- this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
- this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS);
- }
-
- @PreDestroy
- public void close() {
- this.executorService.shutdownNow();
+ serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener());
}
/**
@@ -109,6 +94,69 @@ public class LowerWeightHostManager extends CommonHostManager {
throw new UnsupportedOperationException("not support");
}
+
+ private class WorkerWeightListener implements WorkerInfoChangeListener {
+ @Override
+ public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) {
+ syncWorkerResources(workerGroups, workerNodeInfo);
+ }
+ }
+
+ /**
+ * Sync worker resource.
+ *
+ * @param workerGroupNodes worker group nodes, key is worker group, value is worker group nodes.
+ * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
+ */
+ private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
+ final Map<String, String> workerNodeInfoMap) {
+ try {
+ Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
+ for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
+ String workerGroup = entry.getKey();
+ Set<String> nodes = entry.getValue();
+ Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+ for (String node : nodes) {
+ String heartbeat = workerNodeInfoMap.getOrDefault(node, null);
+ Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
+ hostWeightOpt.ifPresent(hostWeights::add);
+ }
+ if (!hostWeights.isEmpty()) {
+ workerHostWeights.put(workerGroup, hostWeights);
+ }
+ }
+ syncWorkerHostWeight(workerHostWeights);
+ } catch (Throwable ex) {
+ logger.error("Sync worker resource error", ex);
+ }
+ }
+
+ private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
+ if (StringUtils.isEmpty(heartBeatInfo)) {
+ logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
+ return Optional.empty();
+ }
+ HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+ if (heartBeat == null) {
+ return Optional.empty();
+ }
+ if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
+ logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
+ addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
+ return Optional.empty();
+ }
+ if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
+ logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
+ addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
+ return Optional.empty();
+ }
+ return Optional.of(
+ new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
+ heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
+ heartBeat.getStartupTime()));
+ }
+
+
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock();
try {
@@ -128,58 +176,4 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
- class RefreshResourceTask implements Runnable {
-
- @Override
- public void run() {
- try {
- Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
- Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes();
- for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
- String workerGroup = entry.getKey();
- Set<String> nodes = entry.getValue();
- Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
- for (String node : nodes) {
- String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
- Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
- if (hostWeightOpt.isPresent()) {
- hostWeights.add(hostWeightOpt.get());
- }
- }
- if (!hostWeights.isEmpty()) {
- workerHostWeights.put(workerGroup, hostWeights);
- }
- }
- syncWorkerHostWeight(workerHostWeights);
- } catch (Throwable ex) {
- logger.error("RefreshResourceTask error", ex);
- }
- }
-
- public Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
- if (StringUtils.isEmpty(heartBeatInfo)) {
- logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
- return Optional.empty();
- }
- HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
- if (heartBeat == null) {
- return Optional.empty();
- }
- if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
- logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
- addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
- return Optional.empty();
- }
- if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
- logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
- addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
- return Optional.empty();
- }
- return Optional.of(
- new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
- heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
- heartBeat.getStartupTime()));
- }
- }
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index a6599b6032..0c550a1ee3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -131,6 +132,8 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private MasterConfig masterConfig;
+ private List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>();
+
private static volatile int MASTER_SLOT = 0;
private static volatile int MASTER_SIZE = 0;
@@ -217,6 +220,7 @@ public class ServerNodeManager implements InitializingBean {
}
}
}
+ notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
}
@@ -256,6 +260,7 @@ public class ServerNodeManager implements InitializingBean {
String node = parseNode(path);
syncSingleWorkerNodeInfo(node, data);
}
+ notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
} catch (Exception ex) {
@@ -457,6 +462,23 @@ public class ServerNodeManager implements InitializingBean {
}
}
+ /**
+ * Add the resource change listener, when the resource changed, the listener will be notified.
+ *
+ * @param listener will be trigger, when the worker node info changed.
+ */
+ public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener listener) {
+ workerInfoChangeListeners.add(listener);
+ }
+
+ private void notifyWorkerInfoChangeListeners() {
+ Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
+ Map<String, String> workerNodeInfo = getWorkerNodeInfo();
+ for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
+ listener.notify(workerGroupNodes, workerNodeInfo);
+ }
+ }
+
/**
* destroy
*/
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
similarity index 50%
rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
index 5e35b0b2ec..f885a6fba0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
@@ -15,33 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.registry;
-import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Map;
+import java.util.Set;
/**
- * RefreshResourceTask test
+ * The listener used in {@link ServerNodeManager} to notify the change of worker info.
*/
-@RunWith(MockitoJUnitRunner.class)
-public class RefreshResourceTaskTest {
-
- @Mock
- private ServerNodeManager serverNodeManager;
+public interface WorkerInfoChangeListener {
- @InjectMocks
- LowerWeightHostManager lowerWeightHostManager;
+ /**
+ * Used to notify the change of worker info.
+ *
+ * @param workerGroups worker groups map, key is worker group name, value is worker address.
+ * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
+ */
+ void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
- @Test
- public void testGetHostWeightWithResult() {
- Assert.assertTrue(!lowerWeightHostManager.new RefreshResourceTask()
- .getHostWeight("192.168.1.1:22", "default", null)
- .isPresent());
- }
-}
\ No newline at end of file
+}