You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/28 22:27:47 UTC
[20/24] hadoop git commit: YARN-6794. Fair Scheduler to explicitly
promote OPPORTUNISITIC containers locally at the node where they're running.
Contributed by Haibo Chen.
YARN-6794. Fair Scheduler to explicitly promote OPPORTUNISITIC containers locally at the node where they're running. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c1dae721
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c1dae721
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c1dae721
Branch: refs/heads/YARN-1011
Commit: c1dae721e8978fcfb35fb89d6e86f5b14df29bfd
Parents: f9db036
Author: Miklos Szegedi <sz...@apache.org>
Authored: Wed Jun 20 10:53:20 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 28 14:15:01 2018 -0700
----------------------------------------------------------------------
.../scheduler/SchedulerApplicationAttempt.java | 3 +-
.../scheduler/SchedulerNode.java | 27 +
.../scheduler/fair/FSAppAttempt.java | 32 +-
.../scheduler/fair/FSSchedulerNode.java | 57 +-
.../scheduler/fair/FairScheduler.java | 47 +-
.../TestResourceTrackerService.java | 3 +-
.../scheduler/fair/TestFairScheduler.java | 569 +++++++++++++++++++
7 files changed, 715 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 25a9415..d812e57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -672,7 +672,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public Resource getCurrentConsumption() {
- return attemptResourceUsage.getUsed();
+ return Resources.add(attemptResourceUsage.getUsed(),
+ attemptOpportunisticResourceUsage.getUsed());
}
private Container updateContainerAndNMToken(RMContainer rmContainer,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 3e16dc2..9e38d52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -282,6 +282,33 @@ public abstract class SchedulerNode {
return true;
}
+ /**
+ * Attempt to promote an OPPORTUNISTIC container that has been allocated.
+ * @param rmContainer the OPPORTUNISTIC container to promote
+ * @return true if the given OPPORTUNISTIC container is promoted,
+ * false otherwise
+ */
+ public synchronized boolean tryToPromoteOpportunisticContainer(
+ RMContainer rmContainer) {
+ assert (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC);
+
+ boolean promoted = false;
+ Resource resource = rmContainer.getContainer().getResource();
+ if (allocatedContainers.containsKey(rmContainer.getContainerId()) &&
+ Resources.fitsIn(resource, getUnallocatedResource())) {
+ Resources.subtractFrom(allocatedResourceOpportunistic, resource);
+ numOpportunisticContainers--;
+
+ Resources.addTo(allocatedResourceGuaranteed, resource);
+ numGuaranteedContainers++;
+ Resources.subtractFrom(unallocatedResource, resource);
+
+ promoted = true;
+ }
+
+ return promoted;
+ }
+
/**
* Get resources that are not allocated to GUARANTEED containers on the node.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 7b32a9d..b68c245 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -505,6 +505,29 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
/**
+ * Update resource accounting upon promotion of an OPPORTUNISTIC container.
+ * @param rmContainer the OPPORTUNISTIC container that has been promoted
+ */
+ public void opportunisticContainerPromoted(RMContainer rmContainer) {
+ // only an OPPORTUNISTIC container can be promoted
+ assert (ExecutionType.OPPORTUNISTIC == rmContainer.getExecutionType());
+
+ // the container to be promoted must belong to the current app attempt
+ if (rmContainer.getApplicationAttemptId().equals(
+ getApplicationAttemptId())) {
+ Resource resource = rmContainer.getContainer().getResource();
+ try {
+ writeLock.lock();
+ attemptOpportunisticResourceUsage.decUsed(resource);
+ attemptResourceUsage.incUsed(resource);
+ getQueue().incUsedGuaranteedResource(resource);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+
+ /**
* Should be called when the scheduler assigns a container at a higher
* degree of locality than the current threshold. Reset the allowed locality
* level to a higher degree of locality.
@@ -1159,7 +1182,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
*
* @param node
* Node that the application has an existing reservation on
- * @return whether the reservation on the given node is valid.
+ * @return true if the reservation is turned into an allocation
*/
boolean assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
@@ -1186,8 +1209,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
node.getUnallocatedResource())) {
assignContainer(node, false, true);
+ return true;
}
- return true;
+ return false;
}
/**
@@ -1355,12 +1379,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
@Override
public Resource getGuaranteedResourceUsage() {
- return getCurrentConsumption();
+ return Resources.clone(attemptResourceUsage.getUsed());
}
@Override
public Resource getOpportunisticResourceUsage() {
- return attemptOpportunisticResourceUsage.getUsed();
+ return Resources.clone(attemptOpportunisticResourceUsage.getUsed());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index a53dda4..efbe615 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -35,10 +36,13 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -66,6 +70,13 @@ public class FSSchedulerNode extends SchedulerNode {
// slated for preemption
private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
+ // The set of containers that need to be handled before resource
+ // available on the node can be assigned to resource requests.
+ // This is a queue of reserved and opportunistic containers on
+ // the node.
+ private final LinkedHashSet<RMContainer> priorityContainers =
+ new LinkedHashSet(1);
+
@VisibleForTesting
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
@@ -124,6 +135,7 @@ public class FSSchedulerNode extends SchedulerNode {
+ application.getApplicationId());
}
setReservedContainer(container);
+ priorityContainers.add(container);
this.reservedAppSchedulable = (FSAppAttempt) application;
}
@@ -142,7 +154,7 @@ public class FSSchedulerNode extends SchedulerNode {
" for application " + reservedApplication.getApplicationId() +
" on node " + this);
}
-
+ priorityContainers.remove(getReservedContainer());
setReservedContainer(null);
this.reservedAppSchedulable = null;
}
@@ -274,6 +286,13 @@ public class FSSchedulerNode extends SchedulerNode {
} else {
LOG.error("Allocated empty container" + rmContainer.getContainerId());
}
+
+ // keep track of opportunistic containers allocated so that we can promote
+ // them before we assign resources available to resource requests.
+ if (ExecutionType.OPPORTUNISTIC.equals(
+ rmContainer.getContainer().getExecutionType())) {
+ priorityContainers.add(rmContainer);
+ }
}
/**
@@ -292,4 +311,40 @@ public class FSSchedulerNode extends SchedulerNode {
containersForPreemption.remove(container);
}
}
+
+ /**
+ * Try to assign resources available to reserved container and opportunistic
+ * containers that have been allocated.
+ * @return the list of opportunistic containers that have been promoted
+ */
+ public synchronized List<RMContainer> handlePriorityContainers() {
+ boolean assigned = true;
+ List<RMContainer> promotedContainers = new ArrayList<>(0);
+
+ List<RMContainer> candidateContainers = new ArrayList<>(priorityContainers);
+ for (RMContainer rmContainer : candidateContainers) {
+ boolean isReservedContainer =
+ rmContainer.getReservedSchedulerKey() != null;
+ if (isReservedContainer) {
+ // attempt to assign resources that have been reserved
+ FSAppAttempt reservedApp = getReservedAppSchedulable();
+ if (reservedApp != null) {
+ reservedApp.assignReservedContainer(this);
+ }
+ } else {
+ if (super.tryToPromoteOpportunisticContainer(rmContainer)) {
+ priorityContainers.remove(rmContainer);
+ assigned = true;
+ promotedContainers.add(rmContainer);
+ }
+ }
+
+ if (!assigned) {
+ // break out of the loop because assigned being false indicates
+ // there is no more resources that are available for promotion.
+ break;
+ }
+ }
+ return promotedContainers;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 25782a1..31f1961 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -104,6 +106,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -1113,8 +1116,8 @@ public class FairScheduler extends
// Assign new containers...
// 1. Ensure containers are assigned to the apps that preempted
- // 2. Check for reserved applications
- // 3. Schedule GUARANTEED containers if there are no reservations
+ // 2. Check for reserved applications or promote OPPORTUNISTIC containers
+ // 3. Schedule GUARANTEED containers
// 4. Schedule OPPORTUNISTIC containers if possible
// Apps may wait for preempted containers
@@ -1123,12 +1126,14 @@ public class FairScheduler extends
// when C does not qualify for preemption itself.
attemptToAssignPreemptedResources(node);
- boolean validReservation = attemptToAssignReservedResources(node);
- if (!validReservation) {
- // only attempt to assign GUARANTEED containers if there is no
- // reservation on the node because
- attemptToAssignResourcesAsGuaranteedContainers(node);
- }
+ // before we assign resources to outstanding resource requests, we
+ // need to assign the resources to either the container that has
+ // made a reservation or allocated OPPORTUNISTIC containers so that
+ // they can be promoted. This ensures that request requests that
+ // are eligible for guaranteed resources are satisfied in FIFO order
+ attemptToAssignReservedResourcesOrPromoteOpportunisticContainers(node);
+
+ attemptToAssignResourcesAsGuaranteedContainers(node);
// attempt to assign OPPORTUNISTIC containers regardless of whether
// we have made a reservation or assigned a GUARANTEED container
@@ -1143,15 +1148,27 @@ public class FairScheduler extends
}
/**
- * Assign the reserved resource to the application that have reserved it.
+ * Attempt to assign reserved resources and promote OPPORTUNISTIC containers
+ * thata have already been allocated.
*/
- private boolean attemptToAssignReservedResources(FSSchedulerNode node) {
- boolean success = false;
- FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
- if (reservedAppSchedulable != null) {
- success = reservedAppSchedulable.assignReservedContainer(node);
+ private void attemptToAssignReservedResourcesOrPromoteOpportunisticContainers(
+ FSSchedulerNode node) {
+ Map<Container, ContainerUpdateType> promotion = new HashMap<>(0);
+
+ List<RMContainer> promoted = node.handlePriorityContainers();
+ for (RMContainer rmContainer : promoted) {
+ FSAppAttempt appAttempt = getSchedulerApp(
+ rmContainer.getApplicationAttemptId());
+ appAttempt.opportunisticContainerPromoted(rmContainer);
+
+ promotion.put(rmContainer.getContainer(),
+ ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
+ }
+
+ if (!promotion.isEmpty()) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeUpdateContainerEvent(node.getNodeID(), promotion));
}
- return success;
}
private void attemptToAssignResourcesAsGuaranteedContainers(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e40b3c0..360d1af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -2309,9 +2309,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rc.getExecutionType());
}
- // Should only include GUARANTEED resources
currentConsumption = applicationAttempt.getCurrentConsumption();
- Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption);
+ Assert.assertEquals(Resource.newInstance(5120, 3), currentConsumption);
allocResources =
applicationAttempt.getQueue().getMetrics().getAllocatedResources();
Assert.assertEquals(Resource.newInstance(2048, 1), allocResources);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 5878ccd..7fbf84a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -3353,6 +3353,575 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
}
+ /**
+ * Test promotion of a single OPPORTUNISTIC container when no resources are
+ * reserved on the node where the container is allocated.
+ */
+ @Test
+ public void testSingleOpportunisticContainerPromotionWithoutReservation()
+ throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ "yarn.resource-types.memory-mb.increment-allocation", 1024);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create two scheduling requests that leave no unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // node utilization is low after the two container run on the node
+ ContainerStatus container1Status = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ ContainerStatus container2Status = ContainerStatus.newInstance(
+ allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+ containerStatuses.add(container1Status);
+ containerStatuses.add(container2Status);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ List<Container> allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers3.get(0).getExecutionType());
+ assertTrue("No reservation should be made for the third request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+ // now the first GUARANTEED container finishes
+ List<ContainerStatus> finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+ // the OPPORTUNISTIC container should be promoted
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test promotion of two OPPORTUNISTIC containers when no resources are
+ * reserved on the node where the container is allocated.
+ */
+ @Test
+ public void testMultipleOpportunisticContainerPromotionWithoutReservation()
+ throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ "yarn.resource-types.memory-mb.increment-allocation", 1024);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create two scheduling requests that leave no unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // node utilization is low after the two container run on the node
+ ContainerStatus container1Status = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ ContainerStatus container2Status = ContainerStatus.newInstance(
+ allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+ containerStatuses.add(container1Status);
+ containerStatuses.add(container2Status);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(1536, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1536, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers3.get(0).getExecutionType());
+ assertTrue("No reservation should be made for the third request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+ // node utilization is low after the third container run on the node
+ ContainerStatus container3Status = ContainerStatus.newInstance(
+ allocatedContainers3.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(container3Status),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(2000, 0, 0.2f));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt4 =
+ createSchedulingRequest(1024, "queue3", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers4 =
+ scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers4.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers4.get(0).getExecutionType());
+
+ // now the first GUARANTEED container finishes
+ List<ContainerStatus> finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ // the first OPPORTUNISTIC container should be promoted
+ assertEquals(1536, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ // the second OPPORLTUNISTIC container should not be promoted
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getGuaranteedResourceUsage().getMemorySize());
+
+ // now the second GUARANTEED container finishes
+ finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers2.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(3000, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ // the second OPPORTUNISTIC container should be promoted
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getGuaranteedResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test promotion of OPPORTUNISTIC container when there is resources
+ * reserved before the container is allocated. The scheduler should
+ * satisfy the reservation first before it promotes the OPPORTUNISTIC
+ * container when resources are released.
+ */
+ @Test
+ public void testOpportunisticContainerPromotionWithPriorReservation()
+ throws Exception {
+
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ "yarn.resource-types.memory-mb.increment-allocation", 1024);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create two scheduling requests that leave no unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // node utilization is low after the two container run on the node
+ ContainerStatus container1Status = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ ContainerStatus container2Status = ContainerStatus.newInstance(
+ allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+ containerStatuses.add(container1Status);
+ containerStatuses.add(container2Status);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+ // create another scheduling request that opts out of oversubscription
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(2000, "queue2", "user1", 1, true);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 0);
+ // verify that a reservation is made for the second request
+ assertTrue("A reservation should be made for the third request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer().
+ getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt4 =
+ createSchedulingRequest(1024, "queue3", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+ List<Container> allocatedContainers4 =
+ scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers4.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers4.get(0).getExecutionType());
+ assertTrue("A reservation should still be made for the second request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer().
+ getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+ // now the first GUARANTEED container finishes
+ List<ContainerStatus> finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+ // the reserved container of the third request that opted out of
+ // oversubscription should now be satisfied with a GUARANTEED container
+ assertEquals(2000, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers3.get(0).getExecutionType());
+ assertTrue("The reservation for the third request should be canceled",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+ // the OPPORTUNISTIC container should not be promoted given the released
+ // resources are taken by handling the reservation
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ // now the second GUARANTEED container finishes
+ finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers2.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(3000, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+ // the OPPORTUNISTIC container should be promoted
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+ getGuaranteedResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getOpportunisticResourceUsage().getMemorySize());
+
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+ memoryAllocationIncrement);
+ }
+ }
+
+ /**
+ * Test promotion of OPPORTUNISTIC container when there is resources
+ * reserved after the container is allocated. The scheduler should
+ * promotes the OPPORTUNISTIC container before it satisfy the reservation
+ * when resources are released.
+ */
+ @Test
+ public void testOpportunisticContainerPromotionWithPostReservation()
+ throws Exception {
+
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ "yarn.resource-types.memory-mb.increment-allocation", 1024);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create two scheduling requests that leave no unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(2048, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers2.get(0).getExecutionType());
+
+ // node utilization is low after the two container run on the node
+ ContainerStatus container1Status = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ ContainerStatus container2Status = ContainerStatus.newInstance(
+ allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+ containerStatuses.add(container1Status);
+ containerStatuses.add(container2Status);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+ // create another scheduling request that asks for more than what's left
+ // unallocated on the node but can be served with overallocation.
+ ApplicationAttemptId appAttempt3 =
+ createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ List<Container> allocatedContainers3 =
+ scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers3.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.OPPORTUNISTIC,
+ allocatedContainers3.get(0).getExecutionType());
+ assertTrue("No reservation should be made for the third request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+ // create another scheduling request that opts out of oversubscription
+ ApplicationAttemptId appAttempt4 =
+ createSchedulingRequest(2000, "queue3", "user1", 1, true);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers4 =
+ scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers4.size() == 0);
+ // verify that a reservation is made for the second request
+ assertTrue("A reservation should be made for the fourth request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer().
+ getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+ // now the first GUARANTEED container finishes
+ List<ContainerStatus> finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+ // the OPPORTUNISTIC container should be promoted
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getOpportunisticResourceUsage().getMemorySize());
+ assertTrue("A reservation should still be made for the fourth request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer().
+ getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+ // now the second GUARANTEED container finishes
+ finishedContainers = Collections.singletonList(
+ ContainerStatus.newInstance(allocatedContainers2.get(0).getId(),
+ ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+ ResourceUtilization.newInstance(3000, 0, 0.1f));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+ // the reserved container of the fourth request that opted out of
+ // oversubscription should now be satisfied with a GUARANTEED container
+ assertEquals(2000, scheduler.getQueueManager().getQueue("queue3").
+ getGuaranteedResourceUsage().getMemorySize());
+ allocatedContainers4 =
+ scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers4.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers4.get(0).getExecutionType());
+ assertTrue("The reservation for the fourth request should be canceled",
+ scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+ memoryAllocationIncrement);
+ }
+ }
+
@Test
public void testAclSubmitApplication() throws Exception {
// Set acl's
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org