You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2020/06/12 14:11:15 UTC
[hadoop] branch trunk updated: YARN-10293. Reserved Containers not
allocated from available space of other nodes in CandidateNodeSet for
MultiNodePlacement. Contributed by Prabhu Joseph.
This is an automated email from the ASF dual-hosted git repository.
taoyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7c4de59 YARN-10293. Reserved Containers not allocated from available space of other nodes in CandidateNodeSet for MultiNodePlacement. Contributed by Prabhu Joseph.
7c4de59 is described below
commit 7c4de59fc10953170bbef9a320ce70bcddae8bba
Author: Tao Yang <ta...@apache.org>
AuthorDate: Fri Jun 12 22:10:25 2020 +0800
YARN-10293. Reserved Containers not allocated from available space of other nodes in CandidateNodeSet for MultiNodePlacement. Contributed by Prabhu Joseph.
---
.../scheduler/capacity/CapacityScheduler.java | 28 +--
.../capacity/TestCapacitySchedulerMultiNodes.java | 7 -
...tCapacitySchedulerMultiNodesWithPreemption.java | 271 +++++++++++++++++++++
3 files changed, 276 insertions(+), 30 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5cef57a..a6aa824 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1721,31 +1721,13 @@ public class CapacityScheduler extends
*/
private CSAssignment allocateContainersOnMultiNodes(
CandidateNodeSet<FiCaSchedulerNode> candidates) {
- // When this time look at multiple nodes, try schedule if the
- // partition has any available resource or killable resource
- if (getRootQueue().getQueueCapacities().getUsedCapacity(
- candidates.getPartition()) >= 1.0f
- && preemptionManager.getKillableResource(
- CapacitySchedulerConfiguration.ROOT, candidates.getPartition())
- == Resources.none()) {
- // Try to allocate from reserved containers
- for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- allocateFromReservedContainer(node, false, reservedContainer);
- }
+ // Try to allocate from reserved containers
+ for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ allocateFromReservedContainer(node, false, reservedContainer);
}
- LOG.debug("This partition '{}' doesn't have available or "
- + "killable resource", candidates.getPartition());
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, null,
- "", getRootQueue().getQueuePath(), ActivityState.REJECTED,
- ActivityDiagnosticConstant.
- INIT_CHECK_PARTITION_RESOURCE_INSUFFICIENT);
- ActivitiesLogger.NODE
- .finishSkippedNodeAllocation(activitiesManager, null);
- return null;
}
-
return allocateOrReserveNewContainers(candidates, false);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
index bb2cbfd..29de815 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -258,13 +258,6 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Trigger scheduling to allocate a container on nm1 for app2.
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
- Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(7 * GB,
cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java
new file mode 100644
index 0000000..65e0a17
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestCapacitySchedulerMultiNodesWithPreemption
+ extends CapacitySchedulerTestBase {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestCapacitySchedulerMultiNodesWithPreemption.class);
+ private CapacitySchedulerConfiguration conf;
+ private static final String POLICY_CLASS_NAME =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement."
+ + "ResourceUsageMultiNodeLookupPolicy";
+
+ @Before
+ public void setUp() {
+ CapacitySchedulerConfiguration config =
+ new CapacitySchedulerConfiguration();
+ config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+ conf = new CapacitySchedulerConfiguration(config);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+ "resource-based");
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+ "resource-based");
+ String policyName =
+ CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ + ".resource-based" + ".class";
+ conf.set(policyName, POLICY_CLASS_NAME);
+ conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
+ true);
+ // Set this to avoid the AM pending issue
+ conf.set(CapacitySchedulerConfiguration
+ .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, "1");
+ conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
+ conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
+ conf.setInt("yarn.scheduler.maximum-allocation-mb", 102400);
+
+ // Configure two queues to test Preemption
+ conf.set("yarn.scheduler.capacity.root.queues", "A, default");
+ conf.set("yarn.scheduler.capacity.root.A.capacity", "50");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "50");
+ conf.set("yarn.scheduler.capacity.root.A.maximum-capacity", "100");
+ conf.set("yarn.scheduler.capacity.root.default.maximum-capacity", "100");
+ conf.set("yarn.scheduler.capacity.root.A.user-limit-factor", "10");
+ conf.set("yarn.scheduler.capacity.root.default.user-limit-factor", "10");
+
+ // Configure Preemption
+ conf.setLong(
+ CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+ conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+ 1500);
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ 1.0f);
+ conf.setFloat(
+ CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+ 1.0f);
+
+ conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ }
+
+ @Test(timeout=60000)
+ public void testAllocateReservationFromOtherNode() throws Exception {
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ MockNM[] nms = new MockNM[3];
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 1 * GB, 2);
+ nms[0] = nm1;
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * GB, 2);
+ nms[1] = nm2;
+ MockNM nm3 = rm.registerNode("127.0.0.3:1234", 3 * GB, 2);
+ nms[2] = nm3;
+
+ MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
+ .getMultiNodeSortingManager();
+ MultiNodeSorter<SchedulerNode> sorter = mns
+ .getMultiNodePolicy(POLICY_CLASS_NAME);
+ sorter.reSortClusterNodes();
+
+ // Step 1: Launch an App in Default Queue which utilizes the entire cluster
+ RMApp app1 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm)
+ .withAppName("app-1")
+ .withUser("user1")
+ .withAcls(null)
+ .withQueue("default")
+ .build());
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ am1.allocateAndWaitForContainers("*", 1, 2 * GB, nm2);
+ am1.allocateAndWaitForContainers("*", 1, 1 * GB, nm3);
+
+ // Step 2: Wait till the nodes utilization are full
+ GenericTestUtils.waitFor(() -> {
+ SchedulerNodeReport reportNM1 =
+ rm.getResourceScheduler().getNodeReport(nms[0].getNodeId());
+ SchedulerNodeReport reportNM2 =
+ rm.getResourceScheduler().getNodeReport(nms[1].getNodeId());
+ return (reportNM1.getAvailableResource().getMemorySize() == 0 * GB)
+ && (reportNM2.getAvailableResource().getMemorySize() == 0 * GB);
+ }, 10, 10000);
+
+
+ // Step 3: Launch another App in Queue A which will be Reserved
+ // after Preemption
+ final AtomicBoolean result = new AtomicBoolean(false);
+ RMApp app2 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+ .withAppName("app-2")
+ .withUser("user2")
+ .withAcls(null)
+ .withQueue("A")
+ .build());
+
+ // Launch AM in a thread and in parallel free the preempted node's
+ // unallocated resources in main thread
+ Thread t1 = new Thread() {
+ public void run() {
+ try {
+ MockAM am2 = MockRM.launchAM(app2, rm, nm1);
+ result.set(true);
+ } catch (Exception e) {
+ Assert.fail("Failed to launch app-2");
+ }
+ }
+ };
+ t1.start();
+
+ // Step 4: Wait for Preemption to happen. It will preempt Node1 (1GB)
+ // Get the node where preemption happened which has the available space
+ final AtomicReference<MockNM> preemptedNode = new AtomicReference<>();
+ GenericTestUtils.waitFor(() -> {
+ for (int i = 0; i < nms.length; i++) {
+ SchedulerNodeReport reportNM =
+ rm.getResourceScheduler().getNodeReport(nms[i].getNodeId());
+ if (reportNM.getAvailableResource().getMemorySize() == 1 * GB) {
+ preemptedNode.set(nms[i]);
+ return true;
+ }
+ }
+ return false;
+ }, 10, 30000);
+ LOG.info("Preempted node is: " + preemptedNode.get().getNodeId());
+
+
+ // Step 5: Don't release the container from NodeManager so that Reservation
+ // happens. Used Capacity will be < 1.0f but nodes won't have available
+ // containers so Reservation will happen.
+ FiCaSchedulerNode schedulerNode =
+ ((CapacityScheduler) rm.getResourceScheduler())
+ .getNodeTracker().getNode(preemptedNode.get().getNodeId());
+ Resource curResource = schedulerNode.getUnallocatedResource();
+ schedulerNode.deductUnallocatedResource(Resource.newInstance(curResource));
+
+ ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
+ .removeNode(preemptedNode.get().getNodeId());
+ ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
+ .addNode(schedulerNode);
+
+ // Send a heartbeat to kick the tires on the Scheduler
+ // The container will be reserved for app-2
+ RMNode preemptedRMNode = rm.getRMContext().getRMNodes().get(
+ preemptedNode.get().getNodeId());
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(
+ preemptedRMNode);
+ rm.getResourceScheduler().handle(nodeUpdate);
+
+ // Validate if Reservation happened
+ // Reservation will happen on last node in the iterator - Node3
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ ApplicationAttemptId app2AttemptId = app2.getCurrentAppAttempt()
+ .getAppAttemptId();
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(app2AttemptId);
+
+ assertEquals("App2 failed to get reserved container", 1,
+ schedulerApp.getReservedContainers().size());
+ LOG.info("Reserved node is: " +
+ schedulerApp.getReservedContainers().get(0).getReservedNode());
+ assertNotEquals("Failed to reserve as per the Multi Node Itearor",
+ schedulerApp.getReservedContainers().get(0).getReservedNode(),
+ preemptedNode.get().getNodeId());
+
+
+ // Step 6: Okay, now preempted node is Node1 and reserved node is Node3
+ // Validate if the Reserved Container gets allocated
+ // after updating release container.
+ schedulerNode = ((CapacityScheduler) rm.getResourceScheduler())
+ .getNodeTracker().getNode(preemptedNode.get().getNodeId());
+ curResource = schedulerNode.getAllocatedResource();
+ schedulerNode.updateTotalResource(
+ Resources.add(schedulerNode.getTotalResource(), curResource));
+
+ ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
+ .removeNode(preemptedNode.get().getNodeId());
+ ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
+ .addNode(schedulerNode);
+
+ preemptedRMNode = rm.getRMContext().getRMNodes().get(
+ preemptedNode.get().getNodeId());
+ nodeUpdate = new NodeUpdateSchedulerEvent(preemptedRMNode);
+ rm.getResourceScheduler().handle(nodeUpdate);
+
+ // Step 7: Wait for app-2 to get ALLOCATED
+ GenericTestUtils.waitFor(() -> {
+ return result.get();
+ }, 10, 20000);
+
+ // Step 8: Validate if app-2 has got 1 live container and
+ // released the reserved container
+ schedulerApp = cs.getApplicationAttempt(app2AttemptId);
+ assertEquals("App2 failed to get Allocated", 1,
+ schedulerApp.getLiveContainers().size());
+ assertEquals("App2 failed to Unreserve", 0,
+ schedulerApp.getReservedContainers().size());
+
+ rm.stop();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org