You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/28 22:27:47 UTC

[20/24] hadoop git commit: YARN-6794. Fair Scheduler to explicitly promote OPPORTUNISITIC containers locally at the node where they're running. Contributed by Haibo Chen.

YARN-6794. Fair Scheduler to explicitly promote OPPORTUNISITIC containers locally at the node where they're running. Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c1dae721
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c1dae721
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c1dae721

Branch: refs/heads/YARN-1011
Commit: c1dae721e8978fcfb35fb89d6e86f5b14df29bfd
Parents: f9db036
Author: Miklos Szegedi <sz...@apache.org>
Authored: Wed Jun 20 10:53:20 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 28 14:15:01 2018 -0700

----------------------------------------------------------------------
 .../scheduler/SchedulerApplicationAttempt.java  |   3 +-
 .../scheduler/SchedulerNode.java                |  27 +
 .../scheduler/fair/FSAppAttempt.java            |  32 +-
 .../scheduler/fair/FSSchedulerNode.java         |  57 +-
 .../scheduler/fair/FairScheduler.java           |  47 +-
 .../TestResourceTrackerService.java             |   3 +-
 .../scheduler/fair/TestFairScheduler.java       | 569 +++++++++++++++++++
 7 files changed, 715 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 25a9415..d812e57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -672,7 +672,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   }
   
   public Resource getCurrentConsumption() {
-    return attemptResourceUsage.getUsed();
+    return Resources.add(attemptResourceUsage.getUsed(),
+        attemptOpportunisticResourceUsage.getUsed());
   }
   
   private Container updateContainerAndNMToken(RMContainer rmContainer,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 3e16dc2..9e38d52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -282,6 +282,33 @@ public abstract class SchedulerNode {
     return true;
   }
 
+  /**
+   * Attempt to promote an OPPORTUNISTIC container that has been allocated.
+   * @param rmContainer the OPPORTUNISTIC container to promote
+   * @return true if the given OPPORTUNISTIC container is promoted,
+   *         false otherwise
+   */
+  public synchronized boolean tryToPromoteOpportunisticContainer(
+      RMContainer rmContainer) {
+    assert (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC);
+
+    boolean promoted = false;
+    Resource resource = rmContainer.getContainer().getResource();
+    if (allocatedContainers.containsKey(rmContainer.getContainerId()) &&
+        Resources.fitsIn(resource, getUnallocatedResource())) {
+      Resources.subtractFrom(allocatedResourceOpportunistic, resource);
+      numOpportunisticContainers--;
+
+      Resources.addTo(allocatedResourceGuaranteed, resource);
+      numGuaranteedContainers++;
+      Resources.subtractFrom(unallocatedResource, resource);
+
+      promoted = true;
+    }
+
+    return promoted;
+  }
+
 
   /**
    * Get resources that are not allocated to GUARANTEED containers on the node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 7b32a9d..b68c245 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -505,6 +505,29 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
+   * Update resource accounting upon promotion of an OPPORTUNISTIC container.
+   * @param rmContainer the OPPORTUNISTIC container that has been promoted
+   */
+  public void opportunisticContainerPromoted(RMContainer rmContainer) {
+    // only an OPPORTUNISTIC container can be promoted
+    assert (ExecutionType.OPPORTUNISTIC == rmContainer.getExecutionType());
+
+    // the container to be promoted must belong to the current app attempt
+    if (rmContainer.getApplicationAttemptId().equals(
+        getApplicationAttemptId())) {
+      Resource resource = rmContainer.getContainer().getResource();
+      try {
+        writeLock.lock();
+        attemptOpportunisticResourceUsage.decUsed(resource);
+        attemptResourceUsage.incUsed(resource);
+        getQueue().incUsedGuaranteedResource(resource);
+      } finally {
+        writeLock.unlock();
+      }
+    }
+  }
+
+  /**
    * Should be called when the scheduler assigns a container at a higher
    * degree of locality than the current threshold. Reset the allowed locality
    * level to a higher degree of locality.
@@ -1159,7 +1182,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    *
    * @param node
    *     Node that the application has an existing reservation on
-   * @return whether the reservation on the given node is valid.
+   * @return true if the reservation is turned into an allocation
    */
   boolean assignReservedContainer(FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
@@ -1186,8 +1209,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
         node.getUnallocatedResource())) {
       assignContainer(node, false, true);
+      return true;
     }
-    return true;
+    return false;
   }
 
   /**
@@ -1355,12 +1379,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
   @Override
   public Resource getGuaranteedResourceUsage() {
-    return getCurrentConsumption();
+    return Resources.clone(attemptResourceUsage.getUsed());
   }
 
   @Override
   public Resource getOpportunisticResourceUsage() {
-    return attemptOpportunisticResourceUsage.getUsed();
+    return Resources.clone(attemptOpportunisticResourceUsage.getUsed());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index a53dda4..efbe615 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -35,10 +36,13 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -66,6 +70,13 @@ public class FSSchedulerNode extends SchedulerNode {
   // slated for preemption
   private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
 
+  // The set of containers that need to be handled before resource
+  // available on the node can be assigned to resource requests.
+  // This is a queue of reserved and opportunistic containers on
+  // the node.
+  private final LinkedHashSet<RMContainer> priorityContainers =
+      new LinkedHashSet(1);
+
   @VisibleForTesting
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     super(node, usePortForNodeName);
@@ -124,6 +135,7 @@ public class FSSchedulerNode extends SchedulerNode {
           + application.getApplicationId());
     }
     setReservedContainer(container);
+    priorityContainers.add(container);
     this.reservedAppSchedulable = (FSAppAttempt) application;
   }
 
@@ -142,7 +154,7 @@ public class FSSchedulerNode extends SchedulerNode {
           " for application " + reservedApplication.getApplicationId() + 
           " on node " + this);
     }
-    
+    priorityContainers.remove(getReservedContainer());
     setReservedContainer(null);
     this.reservedAppSchedulable = null;
   }
@@ -274,6 +286,13 @@ public class FSSchedulerNode extends SchedulerNode {
     } else {
       LOG.error("Allocated empty container" + rmContainer.getContainerId());
     }
+
+    // keep track of opportunistic containers allocated so that we can promote
+    // them before we assign resources available to resource requests.
+    if (ExecutionType.OPPORTUNISTIC.equals(
+        rmContainer.getContainer().getExecutionType())) {
+      priorityContainers.add(rmContainer);
+    }
   }
 
   /**
@@ -292,4 +311,40 @@ public class FSSchedulerNode extends SchedulerNode {
       containersForPreemption.remove(container);
     }
   }
+
+  /**
+   * Try to assign resources available to reserved container and opportunistic
+   * containers that have been allocated.
+   * @return the list of opportunistic containers that have been promoted
+   */
+  public synchronized List<RMContainer> handlePriorityContainers() {
+    boolean assigned = true;
+    List<RMContainer> promotedContainers = new ArrayList<>(0);
+
+    List<RMContainer> candidateContainers = new ArrayList<>(priorityContainers);
+    for (RMContainer rmContainer : candidateContainers) {
+      boolean isReservedContainer =
+          rmContainer.getReservedSchedulerKey() != null;
+      if (isReservedContainer) {
+        // attempt to assign resources that have been reserved
+        FSAppAttempt reservedApp = getReservedAppSchedulable();
+        if (reservedApp != null) {
+          reservedApp.assignReservedContainer(this);
+        }
+      } else {
+        if (super.tryToPromoteOpportunisticContainer(rmContainer)) {
+          priorityContainers.remove(rmContainer);
+          assigned = true;
+          promotedContainers.add(rmContainer);
+        }
+      }
+
+      if (!assigned) {
+        // break out of the loop because assigned being false indicates
+        // there is no more resources that are available for promotion.
+        break;
+      }
+    }
+    return promotedContainers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 25782a1..31f1961 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -104,6 +106,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1113,8 +1116,8 @@ public class FairScheduler extends
 
       // Assign new containers...
       // 1. Ensure containers are assigned to the apps that preempted
-      // 2. Check for reserved applications
-      // 3. Schedule GUARANTEED containers if there are no reservations
+      // 2. Check for reserved applications or promote OPPORTUNISTIC containers
+      // 3. Schedule GUARANTEED containers
       // 4. Schedule OPPORTUNISTIC containers if possible
 
       // Apps may wait for preempted containers
@@ -1123,12 +1126,14 @@ public class FairScheduler extends
       // when C does not qualify for preemption itself.
       attemptToAssignPreemptedResources(node);
 
-      boolean validReservation =  attemptToAssignReservedResources(node);
-      if (!validReservation) {
-        // only attempt to assign GUARANTEED containers if there is no
-        // reservation on the node because
-        attemptToAssignResourcesAsGuaranteedContainers(node);
-      }
+      // before we assign resources to outstanding resource requests, we
+      // need to assign the resources to either the container that has
+      // made a reservation or allocated OPPORTUNISTIC containers so that
+      // they can be promoted. This ensures that request requests that
+      // are eligible for guaranteed resources are satisfied in FIFO order
+      attemptToAssignReservedResourcesOrPromoteOpportunisticContainers(node);
+
+      attemptToAssignResourcesAsGuaranteedContainers(node);
 
       // attempt to assign OPPORTUNISTIC containers regardless of whether
       // we have made a reservation or assigned a GUARANTEED container
@@ -1143,15 +1148,27 @@ public class FairScheduler extends
   }
 
   /**
-   * Assign the reserved resource to the application that have reserved it.
+   * Attempt to assign reserved resources and promote OPPORTUNISTIC containers
+   * thata have already been allocated.
    */
-  private boolean attemptToAssignReservedResources(FSSchedulerNode node) {
-    boolean success = false;
-    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
-    if (reservedAppSchedulable != null) {
-      success = reservedAppSchedulable.assignReservedContainer(node);
+  private void attemptToAssignReservedResourcesOrPromoteOpportunisticContainers(
+      FSSchedulerNode node) {
+    Map<Container, ContainerUpdateType> promotion = new HashMap<>(0);
+
+    List<RMContainer> promoted = node.handlePriorityContainers();
+    for (RMContainer rmContainer : promoted)  {
+      FSAppAttempt appAttempt = getSchedulerApp(
+          rmContainer.getApplicationAttemptId());
+      appAttempt.opportunisticContainerPromoted(rmContainer);
+
+      promotion.put(rmContainer.getContainer(),
+          ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
+    }
+
+    if (!promotion.isEmpty()) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeUpdateContainerEvent(node.getNodeID(), promotion));
     }
-    return success;
   }
 
   private void attemptToAssignResourcesAsGuaranteedContainers(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e40b3c0..360d1af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -2309,9 +2309,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
           rc.getExecutionType());
     }
 
-    // Should only include GUARANTEED resources
     currentConsumption = applicationAttempt.getCurrentConsumption();
-    Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption);
+    Assert.assertEquals(Resource.newInstance(5120, 3), currentConsumption);
     allocResources =
         applicationAttempt.getQueue().getMetrics().getAllocatedResources();
     Assert.assertEquals(Resource.newInstance(2048, 1), allocResources);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1dae721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 5878ccd..7fbf84a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -3353,6 +3353,575 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     }
   }
 
+  /**
+   * Test promotion of a single OPPORTUNISTIC container when no resources are
+   * reserved on the node where the container is allocated.
+   */
+  @Test
+  public void testSingleOpportunisticContainerPromotionWithoutReservation()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        "yarn.resource-types.memory-mb.increment-allocation", 1024);
+    conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create two scheduling requests that leave no unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // node utilization is low after the two container run on the node
+      ContainerStatus container1Status = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      ContainerStatus container2Status = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+      containerStatuses.add(container1Status);
+      containerStatuses.add(container2Status);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers3.get(0).getExecutionType());
+      assertTrue("No reservation should be made for the third request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+      // now the first GUARANTEED container finishes
+      List<ContainerStatus> finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+      // the OPPORTUNISTIC container should be promoted
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test promotion of two OPPORTUNISTIC containers when no resources are
+   * reserved on the node where the container is allocated.
+   */
+  @Test
+  public void testMultipleOpportunisticContainerPromotionWithoutReservation()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        "yarn.resource-types.memory-mb.increment-allocation", 1024);
+    conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create two scheduling requests that leave no unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // node utilization is low after the two container run on the node
+      ContainerStatus container1Status = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      ContainerStatus container2Status = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+      containerStatuses.add(container1Status);
+      containerStatuses.add(container2Status);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1536, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1536, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers3.get(0).getExecutionType());
+      assertTrue("No reservation should be made for the third request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+      // node utilization is low after the third container run on the node
+      ContainerStatus container3Status = ContainerStatus.newInstance(
+          allocatedContainers3.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(container3Status),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(2000, 0, 0.2f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt4 =
+          createSchedulingRequest(1024, "queue3", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers4 =
+          scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers4.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers4.get(0).getExecutionType());
+
+      // now the first GUARANTEED container finishes
+      List<ContainerStatus> finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      // the first OPPORTUNISTIC container should be promoted
+      assertEquals(1536, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      // the second OPPORLTUNISTIC container should not be promoted
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getGuaranteedResourceUsage().getMemorySize());
+
+      // now the second GUARANTEED container finishes
+      finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers2.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(3000, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      // the second OPPORTUNISTIC container should be promoted
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getGuaranteedResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test promotion of OPPORTUNISTIC container when there is resources
+   * reserved before the container is allocated. The scheduler should
+   * satisfy the reservation first before it promotes the OPPORTUNISTIC
+   * container when resources are released.
+   */
+  @Test
+  public void testOpportunisticContainerPromotionWithPriorReservation()
+      throws Exception {
+
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        "yarn.resource-types.memory-mb.increment-allocation", 1024);
+    conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create two scheduling requests that leave no unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // node utilization is low after the two container run on the node
+      ContainerStatus container1Status = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      ContainerStatus container2Status = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+      containerStatuses.add(container1Status);
+      containerStatuses.add(container2Status);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that opts out of oversubscription
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(2000, "queue2", "user1", 1, true);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 0);
+      // verify that a reservation is made for the second request
+      assertTrue("A reservation should be made for the third request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer().
+              getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt4 =
+          createSchedulingRequest(1024, "queue3", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers4 =
+          scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers4.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers4.get(0).getExecutionType());
+      assertTrue("A reservation should still be made for the second request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer().
+              getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+      // now the first GUARANTEED container finishes
+      List<ContainerStatus> finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+      // the reserved container of the third request that opted out of
+      // oversubscription should now be satisfied with a GUARANTEED container
+      assertEquals(2000, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers3.get(0).getExecutionType());
+      assertTrue("The reservation for the third request should be canceled",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+      // the OPPORTUNISTIC container should not be promoted given the released
+      // resources are taken by handling the reservation
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // now the second GUARANTEED container finishes
+      finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers2.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(3000, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+      // the OPPORTUNISTIC container should be promoted
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getGuaranteedResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test promotion of OPPORTUNISTIC container when there is resources
+   * reserved after the container is allocated. The scheduler should
+   * promotes the OPPORTUNISTIC container before it satisfy the reservation
+   * when resources are released.
+   */
+  @Test
+  public void testOpportunisticContainerPromotionWithPostReservation()
+      throws Exception {
+
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        "yarn.resource-types.memory-mb.increment-allocation", 1024);
+    conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create two scheduling requests that leave no unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // node utilization is low after the two container run on the node
+      ContainerStatus container1Status = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      ContainerStatus container2Status = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+      containerStatuses.add(container1Status);
+      containerStatuses.add(container2Status);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers3.get(0).getExecutionType());
+      assertTrue("No reservation should be made for the third request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+      // create another scheduling request that opts out of oversubscription
+      ApplicationAttemptId appAttempt4 =
+          createSchedulingRequest(2000, "queue3", "user1", 1, true);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers4 =
+          scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers4.size() == 0);
+      // verify that a reservation is made for the second request
+      assertTrue("A reservation should be made for the fourth request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer().
+              getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+      // now the first GUARANTEED container finishes
+      List<ContainerStatus> finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+      // the OPPORTUNISTIC container should be promoted
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      assertTrue("A reservation should still be made for the fourth request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer().
+              getReservedResource().equals(Resource.newInstance(2000, 1)));
+
+      // now the second GUARANTEED container finishes
+      finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers2.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(3000, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+      // the reserved container of the fourth request that opted out of
+      // oversubscription should now be satisfied with a GUARANTEED container
+      assertEquals(2000, scheduler.getQueueManager().getQueue("queue3").
+          getGuaranteedResourceUsage().getMemorySize());
+      allocatedContainers4 =
+          scheduler.getSchedulerApp(appAttempt4).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers4.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers4.get(0).getExecutionType());
+      assertTrue("The reservation for the fourth request should be canceled",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+          memoryAllocationIncrement);
+    }
+  }
+
   @Test
   public void testAclSubmitApplication() throws Exception {
     // Set acl's


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org