You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zt...@apache.org on 2020/12/09 11:49:09 UTC

[hadoop] branch trunk updated: YARN-10380: Import logic of multi-node allocation in CapacityScheduler (#2494)

This is an automated email from the ASF dual-hosted git repository.

ztang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d67ccd0  YARN-10380: Import logic of multi-node allocation in CapacityScheduler (#2494)
d67ccd0 is described below

commit d67ccd03e36b3afdb9ce900460a16a6210444bff
Author: zhuqi <82...@qq.com>
AuthorDate: Wed Dec 9 19:48:39 2020 +0800

    YARN-10380: Import logic of multi-node allocation in CapacityScheduler (#2494)
    
    Contributed by Qi Zhu.
---
 .../scheduler/ClusterNodeTracker.java              |  11 ++
 .../scheduler/capacity/CapacityScheduler.java      | 116 +++++++++++++++------
 .../TestCapacitySchedulerAsyncScheduling.java      |  19 ++++
 3 files changed, 115 insertions(+), 31 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index c39d57d..92b04d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -496,4 +496,15 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     }
     return nodesPerPartition;
   }
+
+  public List<String> getPartitions() {
+    List<String> partitions = null;
+    readLock.lock();
+    try {
+      partitions = new ArrayList(nodesPerLabel.keySet());
+    } finally {
+      readLock.unlock();
+    }
+    return partitions;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e25301b..51df224 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -531,6 +531,8 @@ public class CapacityScheduler extends
 
   /**
    * Schedule on all nodes by starting at a random point.
+   * Schedule on all partitions by starting at a random partition
+   * when multiNodePlacementEnabled is true.
    * @param cs
    */
   static void schedule(CapacityScheduler cs) throws InterruptedException{
@@ -544,44 +546,79 @@ public class CapacityScheduler extends
     if(nodeSize == 0) {
       return;
     }
-    int start = random.nextInt(nodeSize);
 
-    // To avoid too verbose DEBUG logging, only print debug log once for
-    // every 10 secs.
-    boolean printSkipedNodeLogging = false;
-    if (Time.monotonicNow() / 1000 % 10 == 0) {
-      printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
-    } else {
-      printedVerboseLoggingForAsyncScheduling = false;
-    }
+    if (!cs.multiNodePlacementEnabled) {
+      int start = random.nextInt(nodeSize);
+
+      // To avoid too verbose DEBUG logging, only print debug log once for
+      // every 10 secs.
+      boolean printSkipedNodeLogging = false;
+      if (Time.monotonicNow() / 1000 % 10 == 0) {
+        printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
+      } else {
+        printedVerboseLoggingForAsyncScheduling = false;
+      }
+
+      // Allocate containers of node [start, end)
+      for (FiCaSchedulerNode node : nodes) {
+        if (current++ >= start) {
+          if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+            continue;
+          }
+          cs.allocateContainersToNode(node.getNodeID(), false);
+        }
+      }
 
-    // Allocate containers of node [start, end)
-    for (FiCaSchedulerNode node : nodes) {
-      if (current++ >= start) {
+      current = 0;
+
+      // Allocate containers of node [0, start)
+      for (FiCaSchedulerNode node : nodes) {
+        if (current++ > start) {
+          break;
+        }
         if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
           continue;
         }
         cs.allocateContainersToNode(node.getNodeID(), false);
       }
-    }
-
-    current = 0;
 
-    // Allocate containers of node [0, start)
-    for (FiCaSchedulerNode node : nodes) {
-      if (current++ > start) {
-        break;
+      if (printSkipedNodeLogging) {
+        printedVerboseLoggingForAsyncScheduling = true;
       }
-      if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
-        continue;
+    } else {
+      // Get all partitions
+      List<String> partitions = cs.nodeTracker.getPartitions();
+      int partitionSize = partitions.size();
+      // First randomize the start point
+      int start = random.nextInt(partitionSize);
+      // Allocate containers of partition [start, end)
+      for (String partititon : partitions) {
+        if (current++ >= start) {
+          CandidateNodeSet<FiCaSchedulerNode> candidates =
+                  cs.getCandidateNodeSet(partititon);
+          if (candidates == null) {
+            continue;
+          }
+          cs.allocateContainersToNode(candidates, false);
+        }
       }
-      cs.allocateContainersToNode(node.getNodeID(), false);
-    }
 
-    if (printSkipedNodeLogging) {
-      printedVerboseLoggingForAsyncScheduling = true;
-    }
+      current = 0;
+
+      // Allocate containers of partition [0, start)
+      for (String partititon : partitions) {
+        if (current++ > start) {
+          break;
+        }
+        CandidateNodeSet<FiCaSchedulerNode> candidates =
+                cs.getCandidateNodeSet(partititon);
+        if (candidates == null) {
+          continue;
+        }
+        cs.allocateContainersToNode(candidates, false);
+      }
 
+    }
     Thread.sleep(cs.getAsyncScheduleInterval());
   }
 
@@ -1486,17 +1523,34 @@ public class CapacityScheduler extends
   }
 
   private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
-      FiCaSchedulerNode node) {
+          String partition) {
+    CandidateNodeSet<FiCaSchedulerNode> candidates = null;
+    Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
+    List<FiCaSchedulerNode> nodes = nodeTracker
+            .getNodesPerPartition(partition);
+    if (nodes != null && !nodes.isEmpty()) {
+      //Filter for node heartbeat too long
+      nodes.stream()
+              .filter(node -> !shouldSkipNodeSchedule(node, this, true))
+              .forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+      candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
+              nodesByPartition, partition);
+    }
+    return candidates;
+  }
+
+  private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
+          FiCaSchedulerNode node) {
     CandidateNodeSet<FiCaSchedulerNode> candidates = null;
     candidates = new SimpleCandidateNodeSet<>(node);
     if (multiNodePlacementEnabled) {
       Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
       List<FiCaSchedulerNode> nodes = nodeTracker
-          .getNodesPerPartition(node.getPartition());
+              .getNodesPerPartition(node.getPartition());
       if (nodes != null && !nodes.isEmpty()) {
         nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
         candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
-            nodesByPartition, node.getPartition());
+                nodesByPartition, node.getPartition());
       }
     }
     return candidates;
@@ -1513,8 +1567,8 @@ public class CapacityScheduler extends
       int offswitchCount = 0;
       int assignedContainers = 0;
 
-      CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
-          node);
+      CandidateNodeSet<FiCaSchedulerNode> candidates =
+              getCandidateNodeSet(node);
       CSAssignment assignment = allocateContainersToNode(candidates,
           withNodeHeartbeat);
       // Only check if we can allocate more container on the same node when
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 59ab077..5f2bbf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -85,6 +85,10 @@ public class TestCapacitySchedulerAsyncScheduling {
 
   private NMHeartbeatThread nmHeartbeatThread = null;
 
+  private static final String POLICY_CLASS_NAME =
+          "org.apache.hadoop.yarn.server.resourcemanager.scheduler" +
+                  ".placement.ResourceUsageMultiNodeLookupPolicy";
+
   @Before
   public void setUp() throws Exception {
     conf = new YarnConfiguration();
@@ -111,6 +115,21 @@ public class TestCapacitySchedulerAsyncScheduling {
     testAsyncContainerAllocation(3);
   }
 
+  @Test(timeout = 300000)
+  public void testAsyncContainerAllocationWithMultiNode() throws Exception {
+    conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+            "resource-based");
+    conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+            "resource-based");
+    String policyName =
+            CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+                    + ".resource-based" + ".class";
+    conf.set(policyName, POLICY_CLASS_NAME);
+    conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
+            true);
+    testAsyncContainerAllocation(2);
+  }
+
   public void testAsyncContainerAllocation(int numThreads) throws Exception {
     conf.setInt(
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org