You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zt...@apache.org on 2020/12/09 11:49:09 UTC
[hadoop] branch trunk updated: YARN-10380: Import logic of
multi-node allocation in CapacityScheduler (#2494)
This is an automated email from the ASF dual-hosted git repository.
ztang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d67ccd0 YARN-10380: Import logic of multi-node allocation in CapacityScheduler (#2494)
d67ccd0 is described below
commit d67ccd03e36b3afdb9ce900460a16a6210444bff
Author: zhuqi <82...@qq.com>
AuthorDate: Wed Dec 9 19:48:39 2020 +0800
YARN-10380: Import logic of multi-node allocation in CapacityScheduler (#2494)
Contributed by Qi Zhu.
---
.../scheduler/ClusterNodeTracker.java | 11 ++
.../scheduler/capacity/CapacityScheduler.java | 116 +++++++++++++++------
.../TestCapacitySchedulerAsyncScheduling.java | 19 ++++
3 files changed, 115 insertions(+), 31 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index c39d57d..92b04d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -496,4 +496,15 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
}
return nodesPerPartition;
}
+
+ public List<String> getPartitions() {
+ List<String> partitions = null;
+ readLock.lock();
+ try {
+ partitions = new ArrayList(nodesPerLabel.keySet());
+ } finally {
+ readLock.unlock();
+ }
+ return partitions;
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e25301b..51df224 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -531,6 +531,8 @@ public class CapacityScheduler extends
/**
* Schedule on all nodes by starting at a random point.
+ * Schedule on all partitions by starting at a random partition
+ * when multiNodePlacementEnabled is true.
* @param cs
*/
static void schedule(CapacityScheduler cs) throws InterruptedException{
@@ -544,44 +546,79 @@ public class CapacityScheduler extends
if(nodeSize == 0) {
return;
}
- int start = random.nextInt(nodeSize);
- // To avoid too verbose DEBUG logging, only print debug log once for
- // every 10 secs.
- boolean printSkipedNodeLogging = false;
- if (Time.monotonicNow() / 1000 % 10 == 0) {
- printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
- } else {
- printedVerboseLoggingForAsyncScheduling = false;
- }
+ if (!cs.multiNodePlacementEnabled) {
+ int start = random.nextInt(nodeSize);
+
+ // To avoid too verbose DEBUG logging, only print debug log once for
+ // every 10 secs.
+ boolean printSkipedNodeLogging = false;
+ if (Time.monotonicNow() / 1000 % 10 == 0) {
+ printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
+ } else {
+ printedVerboseLoggingForAsyncScheduling = false;
+ }
+
+ // Allocate containers of node [start, end)
+ for (FiCaSchedulerNode node : nodes) {
+ if (current++ >= start) {
+ if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+ continue;
+ }
+ cs.allocateContainersToNode(node.getNodeID(), false);
+ }
+ }
- // Allocate containers of node [start, end)
- for (FiCaSchedulerNode node : nodes) {
- if (current++ >= start) {
+ current = 0;
+
+ // Allocate containers of node [0, start)
+ for (FiCaSchedulerNode node : nodes) {
+ if (current++ > start) {
+ break;
+ }
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
- }
-
- current = 0;
- // Allocate containers of node [0, start)
- for (FiCaSchedulerNode node : nodes) {
- if (current++ > start) {
- break;
+ if (printSkipedNodeLogging) {
+ printedVerboseLoggingForAsyncScheduling = true;
}
- if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
- continue;
+ } else {
+ // Get all partitions
+ List<String> partitions = cs.nodeTracker.getPartitions();
+ int partitionSize = partitions.size();
+ // First randomize the start point
+ int start = random.nextInt(partitionSize);
+ // Allocate containers of partition [start, end)
+ for (String partititon : partitions) {
+ if (current++ >= start) {
+ CandidateNodeSet<FiCaSchedulerNode> candidates =
+ cs.getCandidateNodeSet(partititon);
+ if (candidates == null) {
+ continue;
+ }
+ cs.allocateContainersToNode(candidates, false);
+ }
}
- cs.allocateContainersToNode(node.getNodeID(), false);
- }
- if (printSkipedNodeLogging) {
- printedVerboseLoggingForAsyncScheduling = true;
- }
+ current = 0;
+
+ // Allocate containers of partition [0, start)
+ for (String partititon : partitions) {
+ if (current++ > start) {
+ break;
+ }
+ CandidateNodeSet<FiCaSchedulerNode> candidates =
+ cs.getCandidateNodeSet(partititon);
+ if (candidates == null) {
+ continue;
+ }
+ cs.allocateContainersToNode(candidates, false);
+ }
+ }
Thread.sleep(cs.getAsyncScheduleInterval());
}
@@ -1486,17 +1523,34 @@ public class CapacityScheduler extends
}
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
- FiCaSchedulerNode node) {
+ String partition) {
+ CandidateNodeSet<FiCaSchedulerNode> candidates = null;
+ Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
+ List<FiCaSchedulerNode> nodes = nodeTracker
+ .getNodesPerPartition(partition);
+ if (nodes != null && !nodes.isEmpty()) {
+ //Filter for node heartbeat too long
+ nodes.stream()
+ .filter(node -> !shouldSkipNodeSchedule(node, this, true))
+ .forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+ candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
+ nodesByPartition, partition);
+ }
+ return candidates;
+ }
+
+ private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
+ FiCaSchedulerNode node) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
candidates = new SimpleCandidateNodeSet<>(node);
if (multiNodePlacementEnabled) {
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker
- .getNodesPerPartition(node.getPartition());
+ .getNodesPerPartition(node.getPartition());
if (nodes != null && !nodes.isEmpty()) {
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
- nodesByPartition, node.getPartition());
+ nodesByPartition, node.getPartition());
}
}
return candidates;
@@ -1513,8 +1567,8 @@ public class CapacityScheduler extends
int offswitchCount = 0;
int assignedContainers = 0;
- CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
- node);
+ CandidateNodeSet<FiCaSchedulerNode> candidates =
+ getCandidateNodeSet(node);
CSAssignment assignment = allocateContainersToNode(candidates,
withNodeHeartbeat);
// Only check if we can allocate more container on the same node when
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 59ab077..5f2bbf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -85,6 +85,10 @@ public class TestCapacitySchedulerAsyncScheduling {
private NMHeartbeatThread nmHeartbeatThread = null;
+ private static final String POLICY_CLASS_NAME =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler" +
+ ".placement.ResourceUsageMultiNodeLookupPolicy";
+
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
@@ -111,6 +115,21 @@ public class TestCapacitySchedulerAsyncScheduling {
testAsyncContainerAllocation(3);
}
+ @Test(timeout = 300000)
+ public void testAsyncContainerAllocationWithMultiNode() throws Exception {
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+ "resource-based");
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+ "resource-based");
+ String policyName =
+ CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ + ".resource-based" + ".class";
+ conf.set(policyName, POLICY_CLASS_NAME);
+ conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
+ true);
+ testAsyncContainerAllocation(2);
+ }
+
public void testAsyncContainerAllocation(int numThreads) throws Exception {
conf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org