You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/01/29 05:10:59 UTC
[41/50] [abbrv] hadoop git commit: YARN-1015. FS should watch node
resource utilization and allocate opportunistic containers if appropriate.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index ef5de39..8756c7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -55,14 +55,20 @@ import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -72,6 +78,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -93,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -1056,15 +1065,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(2, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getVirtualCores());
+ getGuaranteedResourceUsage().getVirtualCores());
// verify metrics
QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
@@ -1099,7 +1108,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 1 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1109,7 +1118,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
// Now another node checks in with capacity
@@ -1123,7 +1132,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure this goes to queue 2
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// The old reservation should still be there...
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
@@ -1133,7 +1142,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
- @Test (timeout = 5000)
+ @Test
public void testOffSwitchAppReservationThreshold() throws Exception {
conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
scheduler.init(conf);
@@ -1173,7 +1182,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Verify capacity allocation
assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Create new app with a resource request that can be satisfied by any
// node but would be
@@ -1205,7 +1214,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1266,7 +1275,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Verify capacity allocation
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Create new app with a resource request that can be satisfied by any
// node but would be
@@ -1311,7 +1320,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node4));
assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
scheduler.handle(new NodeUpdateSchedulerEvent(node1));
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1355,7 +1364,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Verify capacity allocation
assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Verify number of reservations have decremented
assertEquals(0,
@@ -1399,7 +1408,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 1 is allocated app capacity
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests likewise
createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1408,7 +1417,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 2 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
@@ -1534,7 +1543,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 1 is allocated app capacity
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests likewise
createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1543,7 +1552,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 2 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
@@ -1583,12 +1592,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure allocated memory of queue1 doesn't exceed its maximum
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
//the reservation of queue1 should be reclaim
assertEquals(0, scheduler.getSchedulerApp(attId1).
getCurrentReservation().getMemorySize());
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
}
@Test
@@ -1628,7 +1637,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 1 is allocated app capacity
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// Now queue 2 requests below threshold
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1637,7 +1646,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 2 has no reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(0,
scheduler.getSchedulerApp(attId).getReservedContainers().size());
@@ -1648,7 +1657,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
.getVirtualCores());
@@ -1663,7 +1672,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Make sure this goes to queue 2
assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
- getResourceUsage().getVirtualCores());
+ getGuaranteedResourceUsage().getVirtualCores());
// The old reservation should still be there...
assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
@@ -2696,7 +2705,361 @@ public class TestFairScheduler extends FairSchedulerTestBase {
2, liveContainers.iterator().next().getContainer().
getPriority().getPriority());
}
-
+
+ /**
+ * Test that NO OPPORTUNISTIC containers can be allocated on a node that
+ * is fully allocated and with a very high utilization.
+ */
+ @Test
+ public void testAllocateNoOpportunisticContainersOnBusyNode()
+ throws IOException {
+ conf.setBoolean(
+ YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(2048, 2), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that takes up the node's full memory
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(2048, "queue1", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization shoots up after the container runs on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(2000, 0, 0.8f));
+
+ // create another scheduling request
+ ApplicationAttemptId appAttempt2
+ = createSchedulingRequest(100, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue("Expecting no containers allocated",
+ allocatedContainers2.size() == 0);
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ // verify that a reservation is made for the second resource request
+ Resource reserved = scheduler.getNode(node.getNodeID()).
+ getReservedContainer().getReservedResource();
+ assertTrue("Expect a reservation made for the second resource request",
+ reserved.equals(Resource.newInstance(100, 1)));
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test that OPPORTUNISTIC containers can be allocated on a node with low
+ * utilization even though there is not enough unallocated resource on the
+ * node to accommodate the request.
+ */
+ @Test
+ public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode()
+ throws IOException {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that leaves some unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(3600, "queue1", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container is launched on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // verify that no reservation is made for the second request given
+ // that it's satisfied by an OPPORTUNISTIC container allocation.
+ assertTrue("No reservation should be made because we have satisfied" +
+ " the second request with an OPPORTUNISTIC container allocation",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test opportunistic containers can be allocated on a node that is fully
+ * allocated but whose utilization is very low.
+ */
+ @Test
+ public void testAllocateOpportunisticContainersOnFullyAllocatedNode()
+ throws IOException {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that takes up the whole node
+ ApplicationAttemptId appAttempt1 = createSchedulingRequest(
+ 4096, "queue1", "user1", 4);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container is launched on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+ // create another scheduling request now that there is no unallocated
+ // resources left on the node, the request should be served with an
+ // allocation of an opportunistic container
+ ApplicationAttemptId appAttempt2 = createSchedulingRequest(
+ 1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // verify that no reservation is made for the second request given
+ // that it's satisfied by an OPPORTUNISTIC container allocation.
+ assertTrue("No reservation should be made because we have satisfied" +
+ " the second request with an OPPORTUNISTIC container allocation",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test opportunistic containers can be allocated on a node with a low
+ * utilization even though there are GUARANTEED containers allocated.
+ */
+ @Test
+ public void testAllocateOpportunisticContainersWithGuaranteedOnes()
+ throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(3200, "queue1", "user1", 3);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(3200, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container is launched on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(512, 0, 0.1f));
+
+ // create two other scheduling requests which in aggregate ask for more
+ // that what's left unallocated on the node.
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(512, "queue2", "user1", 1);
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(1024, "queue3", "user1", 1);
+
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(512, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers2.get(0).getExecutionType());
+
+ List<Container> allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers3.get(0).getExecutionType());
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ // verify that no reservation is made given that the second request should
+ // be satisfied by a GUARANTEED container allocation, the third by an
+ // OPPORTUNISTIC container allocation.
+ assertTrue("No reservation should be made.",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
@Test
public void testAclSubmitApplication() throws Exception {
// Set acl's
@@ -3686,7 +4049,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.FINISHED);
- assertEquals(Resources.none(), app1.getResourceUsage());
+ assertEquals(Resources.none(), app1.getGuaranteedResourceUsage());
}
@Test
@@ -3786,7 +4149,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application1's AM should be finished",
0, app1.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app1.getResourceUsage());
+ Resources.none(), app1.getGuaranteedResourceUsage());
assertEquals("Application3's AM should be running",
1, app3.getLiveContainers().size());
assertEquals("Application3's AM requests 1024 MB memory",
@@ -3806,7 +4169,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application4's AM should not be running",
0, app4.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app4.getResourceUsage());
+ Resources.none(), app4.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
@@ -3822,7 +4185,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application5's AM should not be running",
0, app5.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app5.getResourceUsage());
+ Resources.none(), app5.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
@@ -3835,7 +4198,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application5's AM should not be running",
0, app5.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app5.getResourceUsage());
+ Resources.none(), app5.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
@@ -3851,11 +4214,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application2's AM should be finished",
0, app2.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app2.getResourceUsage());
+ Resources.none(), app2.getGuaranteedResourceUsage());
assertEquals("Application3's AM should be finished",
0, app3.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app3.getResourceUsage());
+ Resources.none(), app3.getGuaranteedResourceUsage());
assertEquals("Application5's AM should be running",
1, app5.getLiveContainers().size());
assertEquals("Application5's AM requests 2048 MB memory",
@@ -3876,7 +4239,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application5's AM should have 0 container",
0, app5.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app5.getResourceUsage());
+ Resources.none(), app5.getGuaranteedResourceUsage());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
2048, queue1.getAmResourceUsage().getMemorySize());
scheduler.update();
@@ -3900,7 +4263,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Application6's AM should not be running",
0, app6.getLiveContainers().size());
assertEquals("Finished application usage should be none",
- Resources.none(), app6.getResourceUsage());
+ Resources.none(), app6.getGuaranteedResourceUsage());
assertEquals("Application6's AM resource shouldn't be updated",
0, app6.getAMResource().getMemorySize());
assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -4617,17 +4980,25 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
- Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0);
- Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0);
- Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB);
- Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB);
+ Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(),
+ 0);
+ Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(),
+ 0);
+ Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(),
+ 1 * GB);
+ Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(),
+ 1 * GB);
scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
- Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB);
- Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB);
- Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0);
- Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0);
+ Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(),
+ 1 * GB);
+ Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(),
+ 1 * GB);
+ Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(),
+ 0);
+ Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(),
+ 0);
}
@Test (expected = YarnException.class)
@@ -4667,7 +5038,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
- assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage());
+ assertEquals(Resource.newInstance(2048, 2),
+ oldQueue.getGuaranteedResourceUsage());
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}
@@ -5091,7 +5463,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
//container will be reserved at node1
RMContainer reservedContainer1 =
@@ -5111,7 +5483,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
app1, RMAppAttemptState.KILLED, false));
assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
- getResourceUsage().getMemorySize());
+ getGuaranteedResourceUsage().getMemorySize());
// container will be allocated at node2
scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -5259,10 +5631,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSAppAttempt app1 = mock(FSAppAttempt.class);
Mockito.when(app1.getDemand()).thenReturn(maxResource);
- Mockito.when(app1.getResourceUsage()).thenReturn(Resources.none());
+ Mockito.when(app1.getGuaranteedResourceUsage()).
+ thenReturn(Resources.none());
FSAppAttempt app2 = mock(FSAppAttempt.class);
Mockito.when(app2.getDemand()).thenReturn(maxResource);
- Mockito.when(app2.getResourceUsage()).thenReturn(Resources.none());
+ Mockito.when(app2.getGuaranteedResourceUsage()).
+ thenReturn(Resources.none());
QueueManager queueManager = scheduler.getQueueManager();
FSParentQueue queue1 = queueManager.getParentQueue("queue1", true);
@@ -5318,7 +5692,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
child1.setMaxShare(new ConfigurableResource(resource));
FSAppAttempt app = mock(FSAppAttempt.class);
Mockito.when(app.getDemand()).thenReturn(resource);
- Mockito.when(app.getResourceUsage()).thenReturn(resource);
+ Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource);
child1.addApp(app, true);
child1.updateDemand();
@@ -5354,7 +5728,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
+ " SteadyFairShare: <memory:0, vCores:0>,"
+ " MaxShare: <memory:4096, vCores:4>,"
+ " MinShare: <memory:0, vCores:0>,"
- + " ResourceUsage: <memory:4096, vCores:4>,"
+ + " Guaranteed ResourceUsage: <memory:4096, vCores:4>,"
+ " Demand: <memory:4096, vCores:4>,"
+ " MaxAMShare: 0.5,"
+ " Runnable: 0}";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index b016c1b..6777b5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -243,11 +243,16 @@ public class TestSchedulingPolicy {
}
@Override
- public Resource getResourceUsage() {
+ public Resource getGuaranteedResourceUsage() {
return usage;
}
@Override
+ public Resource getOpportunisticResourceUsage() {
+ return Resource.newInstance(0, 0);
+ }
+
+ @Override
public Resource getMinShare() {
return minShare;
}
@@ -278,7 +283,8 @@ public class TestSchedulingPolicy {
}
@Override
- public Resource assignContainer(FSSchedulerNode node) {
+ public Resource assignContainer(FSSchedulerNode node,
+ boolean opportunistic) {
throw new UnsupportedOperationException();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org