You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:36 UTC

[dolphinscheduler] 09/29: Remove the schedule thread in LowerWeightHostManager (#10310)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 9a28d320570e9c6768382584a97ab9d2d9225d96
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jun 21 13:30:07 2022 +0800

    Remove the schedule thread in LowerWeightHostManager (#10310)
    
    (cherry picked from commit b100f6c4890fb8db8887e24891c6af963c582360)
---
 .../dispatch/host/LowerWeightHostManager.java      | 136 ++++++++++-----------
 .../server/master/registry/ServerNodeManager.java  |  22 ++++
 .../master/registry/WorkerInfoChangeListener.java} |  37 ++----
 3 files changed, 100 insertions(+), 95 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 646d770b01..f7c26ae765 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
+import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -36,14 +36,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,23 +66,12 @@ public class LowerWeightHostManager extends CommonHostManager {
      */
     private Lock lock;
 
-    /**
-     * executor service
-     */
-    private ScheduledExecutorService executorService;
-
     @PostConstruct
     public void init() {
         this.selector = new LowerWeightRoundRobin();
         this.workerHostWeightsMap = new ConcurrentHashMap<>();
         this.lock = new ReentrantLock();
-        this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
-        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS);
-    }
-
-    @PreDestroy
-    public void close() {
-        this.executorService.shutdownNow();
+        serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener());
     }
 
     /**
@@ -109,6 +94,69 @@ public class LowerWeightHostManager extends CommonHostManager {
         throw new UnsupportedOperationException("not support");
     }
 
+
+    private class WorkerWeightListener implements WorkerInfoChangeListener {
+        @Override
+        public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) {
+            syncWorkerResources(workerGroups, workerNodeInfo);
+        }
+    }
+
+    /**
+     * Sync worker resource.
+     *
+     * @param workerGroupNodes  worker group nodes, key is worker group, value is worker group nodes.
+     * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
+     */
+    private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
+                                     final Map<String, String> workerNodeInfoMap) {
+        try {
+            Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
+            for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
+                String workerGroup = entry.getKey();
+                Set<String> nodes = entry.getValue();
+                Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+                for (String node : nodes) {
+                    String heartbeat = workerNodeInfoMap.getOrDefault(node, null);
+                    Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
+                    hostWeightOpt.ifPresent(hostWeights::add);
+                }
+                if (!hostWeights.isEmpty()) {
+                    workerHostWeights.put(workerGroup, hostWeights);
+                }
+            }
+            syncWorkerHostWeight(workerHostWeights);
+        } catch (Throwable ex) {
+            logger.error("Sync worker resource error", ex);
+        }
+    }
+
+    private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
+        if (StringUtils.isEmpty(heartBeatInfo)) {
+            logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
+            return Optional.empty();
+        }
+        HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+        if (heartBeat == null) {
+            return Optional.empty();
+        }
+        if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
+            logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
+                    addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
+            return Optional.empty();
+        }
+        if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
+            logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
+                    addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
+            return Optional.empty();
+        }
+        return Optional.of(
+            new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
+                           heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
+                           heartBeat.getStartupTime()));
+    }
+
+
     private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
         lock.lock();
         try {
@@ -128,58 +176,4 @@ public class LowerWeightHostManager extends CommonHostManager {
         }
     }
 
-    class RefreshResourceTask implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
-                Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes();
-                for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
-                    String workerGroup = entry.getKey();
-                    Set<String> nodes = entry.getValue();
-                    Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
-                    for (String node : nodes) {
-                        String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
-                        Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
-                        if (hostWeightOpt.isPresent()) {
-                            hostWeights.add(hostWeightOpt.get());
-                        }
-                    }
-                    if (!hostWeights.isEmpty()) {
-                        workerHostWeights.put(workerGroup, hostWeights);
-                    }
-                }
-                syncWorkerHostWeight(workerHostWeights);
-            } catch (Throwable ex) {
-                logger.error("RefreshResourceTask error", ex);
-            }
-        }
-
-        public Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
-            if (StringUtils.isEmpty(heartBeatInfo)) {
-                logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
-                return Optional.empty();
-            }
-            HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
-            if (heartBeat == null) {
-                return Optional.empty();
-            }
-            if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
-                logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
-                        addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
-                return Optional.empty();
-            }
-            if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
-                logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
-                        addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
-                return Optional.empty();
-            }
-            return Optional.of(
-                    new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
-                            heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
-                            heartBeat.getStartupTime()));
-        }
-    }
-
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index a6599b6032..0c550a1ee3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -131,6 +132,8 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     private MasterConfig masterConfig;
 
+    private List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>();
+
     private static volatile int MASTER_SLOT = 0;
 
     private static volatile int MASTER_SIZE = 0;
@@ -217,6 +220,7 @@ public class ServerNodeManager implements InitializingBean {
                         }
                     }
                 }
+                notifyWorkerInfoChangeListeners();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
             }
@@ -256,6 +260,7 @@ public class ServerNodeManager implements InitializingBean {
                         String node = parseNode(path);
                         syncSingleWorkerNodeInfo(node, data);
                     }
+                    notifyWorkerInfoChangeListeners();
                 } catch (IllegalArgumentException ex) {
                     logger.warn(ex.getMessage());
                 } catch (Exception ex) {
@@ -457,6 +462,23 @@ public class ServerNodeManager implements InitializingBean {
         }
     }
 
+    /**
+     * Add the resource change listener, when the resource changed, the listener will be notified.
+     *
+     * @param listener will be trigger, when the worker node info changed.
+     */
+    public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener listener) {
+        workerInfoChangeListeners.add(listener);
+    }
+
+    private void notifyWorkerInfoChangeListeners() {
+        Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
+        Map<String, String> workerNodeInfo = getWorkerNodeInfo();
+        for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
+            listener.notify(workerGroupNodes, workerNodeInfo);
+        }
+    }
+
     /**
      * destroy
      */
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
similarity index 50%
rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
index 5e35b0b2ec..f885a6fba0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
@@ -15,33 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.registry;
 
-import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Map;
+import java.util.Set;
 
 /**
- * RefreshResourceTask test
+ * The listener used in {@link ServerNodeManager} to notify the change of worker info.
  */
-@RunWith(MockitoJUnitRunner.class)
-public class RefreshResourceTaskTest {
-
-    @Mock
-    private ServerNodeManager serverNodeManager;
+public interface WorkerInfoChangeListener {
 
-    @InjectMocks
-    LowerWeightHostManager lowerWeightHostManager;
+    /**
+     * Used to notify the change of worker info.
+     *
+     * @param workerGroups   worker groups map, key is worker group name, value is worker address.
+     * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
+     */
+    void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
 
-    @Test
-    public void testGetHostWeightWithResult() {
-        Assert.assertTrue(!lowerWeightHostManager.new RefreshResourceTask()
-            .getHostWeight("192.168.1.1:22", "default", null)
-            .isPresent());
-    }
-}
\ No newline at end of file
+}