You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:08:12 UTC

[50/50] [abbrv] hadoop git commit: YARN-6672. Add NM preemption of opportunistic containers when utilization goes high.

YARN-6672. Add NM preemption of opportunistic containers when utilization goes high.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/124b378f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/124b378f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/124b378f

Branch: refs/heads/YARN-1011
Commit: 124b378ff2d54db4d6dcac032a816b6cf26d7209
Parents: 23ad2fb
Author: Haibo Chen <ha...@apache.org>
Authored: Mon Jul 2 15:37:47 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 21 17:48:55 2018 -0700

----------------------------------------------------------------------
 .../monitor/ContainersMonitorImpl.java          |  85 ++-
 .../AllocationBasedResourceTracker.java         |   5 +
 .../scheduler/ContainerScheduler.java           |  59 +-
 .../scheduler/ContainerSchedulerEventType.java  |   4 +-
 ...rSchedulerOverallocationPreemptionEvent.java |  45 ++
 .../scheduler/NMAllocationPreemptionPolicy.java |  54 ++
 .../scheduler/ResourceUtilizationTracker.java   |   7 +
 ...shotBasedOverAllocationPreemptionPolicy.java |  81 +++
 .../BaseContainerManagerTest.java               | 126 ++++
 .../TestContainerSchedulerQueuing.java          | 167 +++--
 ...estContainerSchedulerWithOverAllocation.java | 650 +++++++++++--------
 ...shotBasedOverAllocationPreemptionPolicy.java | 259 ++++++++
 12 files changed, 1229 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 8f72b56..c36dfd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -20,15 +20,20 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerOverallocationPreemptionEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPreemptionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPreemptionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,6 +126,7 @@ public class ContainersMonitorImpl extends AbstractService implements
   private NMAllocationPolicy overAllocationPolicy;
   private ResourceThresholds overAllocationPreemptionThresholds;
   private int overAlloctionPreemptionCpuCount = -1;
+  private NMAllocationPreemptionPolicy overAllocationPreemptionPolicy;
 
   private volatile boolean stopped = false;
 
@@ -375,6 +381,9 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.overAllocationPolicy =
         createOverAllocationPolicy(resourceThresholds);
 
+    this.overAllocationPreemptionPolicy = createOverAllocationPreemptionPolicy(
+        overAllocationPreemptionThresholds, overAlloctionPreemptionCpuCount);
+
     LOG.info("NodeManager oversubscription enabled with overallocation " +
         "thresholds (memory:" + overAllocationMemoryUtilizationThreshold +
         ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" +
@@ -387,6 +396,12 @@ public class ContainersMonitorImpl extends AbstractService implements
     return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this);
   }
 
+  private NMAllocationPreemptionPolicy createOverAllocationPreemptionPolicy(
+      ResourceThresholds resourceThresholds, int maxTimesCpuOverLimit) {
+    return new SnapshotBasedOverAllocationPreemptionPolicy(
+        resourceThresholds, maxTimesCpuOverLimit, this);
+  }
+
   private boolean isResourceCalculatorAvailable() {
     if (resourceCalculatorPlugin == null) {
       LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
@@ -673,9 +688,7 @@ public class ContainersMonitorImpl extends AbstractService implements
         setLatestContainersUtilization(trackedContainersUtilization);
 
         // check opportunity to start containers if over-allocation is on
-        if (context.isOverAllocationEnabled()) {
-          attemptToStartContainersUponLowUtilization();
-        }
+        checkUtilization();
 
         // Publish the container utilization metrics to node manager
         // metrics system.
@@ -1076,13 +1089,46 @@ public class ContainersMonitorImpl extends AbstractService implements
     return overAllocationPolicy;
   }
 
+  public NMAllocationPreemptionPolicy getOverAllocationPreemptionPolicy() {
+    return overAllocationPreemptionPolicy;
+  }
+
   private void setLatestContainersUtilization(ResourceUtilization utilization) {
     this.latestContainersUtilization = new ContainersResourceUtilization(
-        utilization, System.currentTimeMillis());
+        utilization, Time.now());
   }
 
+  /**
+   * Check the resource utilization of the node. If the utilization is below
+   * the over-allocation threshold, {@link ContainerScheduler} is notified to
+   * launch OPPORTUNISTIC containers that are being queued to bring the
+   * utilization up to the over-allocation threshold. If the utilization
+   * is above the preemption threshold, {@link ContainerScheduler} is notified
+   * to preempt running OPPORTUNISTIC containers in order to bring the node
+   * utilization down to the preemption threshold.
+   * @return true if the utilization is below the over-allocation threshold or
+   *              above the preemption threshold
+   *         false otherwise
+   */
   @VisibleForTesting
-  public void attemptToStartContainersUponLowUtilization() {
+  public boolean checkUtilization() {
+    if (context.isOverAllocationEnabled()) {
+      return checkLowUtilization() || checkHighUtilization();
+    }
+    return false;
+  }
+
+  /**
+   * Check if the node resource utilization is below the over-allocation
+   * threshold. If so, a {@link ContainerSchedulerEvent} is
+   * generated so that OPPORTUNISTIC containers that are being queued can
+   * be launched by {@link ContainerScheduler} with over-allocation and
+   * the node utilization can be brought up to the over-allocation threshold
+   * @return true if the node utilization is below the over-allocation threshold
+   *         false otherwise
+   */
+  private boolean checkLowUtilization() {
+    boolean opportunisticContainersToStart = false;
     if (getContainerOverAllocationPolicy() != null) {
       Resource available = getContainerOverAllocationPolicy()
           .getAvailableResources();
@@ -1091,8 +1137,37 @@ public class ContainersMonitorImpl extends AbstractService implements
         eventDispatcher.getEventHandler().handle(
             new ContainerSchedulerEvent(null,
                 ContainerSchedulerEventType.SCHEDULE_CONTAINERS));
+        opportunisticContainersToStart = true;
+        LOG.info("Node utilization is below its over-allocation threshold. " +
+            "Inform container scheduler to launch opportunistic containers.");
       }
     }
+    return opportunisticContainersToStart;
+  }
+
+  /**
+   * Check if the node resource utilization is over the preemption threshold.
+   * If so, a {@link ContainerSchedulerOverallocationPreemptionEvent} is
+   * generated so that OPPORTUNISTIC containers can be preempted by
+   * {@link ContainerScheduler} to reclaim resources in order to bring the
+   * node utilization down to the preemption threshold.
+   * @return true if the node utilization is over the preemption threshold
+   *         false otherwise
+   */
+  private boolean checkHighUtilization() {
+    ResourceUtilization overLimit = getOverAllocationPreemptionPolicy()
+        .getResourcesToReclaim();
+
+    boolean opportunisticContainersToPreempt = false;
+
+    if (overLimit.getPhysicalMemory() > 0 || overLimit.getCPU() > 0) {
+      opportunisticContainersToPreempt = true;
+      eventDispatcher.getEventHandler().handle(
+          new ContainerSchedulerOverallocationPreemptionEvent(overLimit));
+      LOG.info("Node utilization is over the preemption threshold. " +
+          "Inform container scheduler to reclaim {}", overLimit);
+    }
+    return opportunisticContainersToPreempt;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
index a3e19eb..7840fef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
@@ -53,6 +53,11 @@ public class AllocationBasedResourceTracker
    */
   @Override
   public ResourceUtilization getCurrentUtilization() {
+    return getTotalAllocation();
+  }
+
+  @Override
+  public ResourceUtilization getTotalAllocation() {
     return this.containersAllocation;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 0d7de67..846efc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -222,12 +222,60 @@ public class ContainerScheduler extends AbstractService implements
       // node utilization is low.
       startOpportunisticContainers(utilizationTracker.getAvailableResources());
       break;
+    case PREEMPT_CONTAINERS:
+      if (event instanceof ContainerSchedulerOverallocationPreemptionEvent) {
+        preemptOpportunisticContainers(
+            (ContainerSchedulerOverallocationPreemptionEvent) event);
+      } else {
+        LOG.error(
+            "Unknown event type on Preempt containers:  {}", event.getType());
+      }
+      break;
     default:
-      LOG.error("Unknown event arrived at ContainerScheduler: "
-          + event.toString());
+      LOG.error("Unknown event arrived at ContainerScheduler: {}", event);
     }
   }
 
+  private void preemptOpportunisticContainers(
+      ContainerSchedulerOverallocationPreemptionEvent event) {
+    ResourceUtilization resourcesToReclaim =
+        getResourcesToReclaim(event.getResourcesOverPreemptionThresholds());
+
+    List<Container> oppContainersToReclaim =
+        pickOpportunisticContainersToReclaimResources(
+            resourcesToReclaim);
+
+    killOpportunisticContainers(oppContainersToReclaim);
+  }
+
+  /**
+   * Get the amount of resources that need to be reclaimed by preempting
+   * OPPORTUNISTIC containers considering the amount of resources that
+   * are over the preemption thresholds and over the capacity of the node.
+   * When the node is not being over-allocated, its resource utilization
+   * can safely go to 100% without any OPPORTUNISTIC containers being killed.
+   */
+  private ResourceUtilization getResourcesToReclaim(
+      ResourceUtilization resourcesOverPreemptionThresholds) {
+    ResourceUtilization totalAllocation = ResourceUtilization.newInstance(
+        utilizationTracker.getTotalAllocation());
+    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+        totalAllocation);
+    ResourceUtilization overAllocatedResources =
+        ResourceUtilization.newInstance(
+            Math.max(0, totalAllocation.getPhysicalMemory()),
+            Math.max(0, totalAllocation.getVirtualMemory()),
+            Math.max(0, totalAllocation.getCPU()));
+
+    return ResourceUtilization.newInstance(
+        Math.min(overAllocatedResources.getPhysicalMemory(),
+            resourcesOverPreemptionThresholds.getPhysicalMemory()),
+        Math.min(overAllocatedResources.getVirtualMemory(),
+            resourcesOverPreemptionThresholds.getVirtualMemory()),
+        Math.min(overAllocatedResources.getCPU(),
+            resourcesOverPreemptionThresholds.getCPU()));
+  }
+
   /**
    * We assume that the ContainerManager has already figured out what kind
    * of update this is.
@@ -601,8 +649,9 @@ public class ContainerScheduler extends AbstractService implements
 
   @SuppressWarnings("unchecked")
   private void reclaimOpportunisticContainerResources() {
+    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
     List<Container> extraOppContainersToReclaim =
-        pickOpportunisticContainersToReclaimResources();
+        pickOpportunisticContainersToReclaimResources(resourcesToFreeUp);
     killOpportunisticContainers(extraOppContainersToReclaim);
   }
 
@@ -642,12 +691,12 @@ public class ContainerScheduler extends AbstractService implements
     container.sendLaunchEvent();
   }
 
-  private List<Container> pickOpportunisticContainersToReclaimResources() {
+  private List<Container> pickOpportunisticContainersToReclaimResources(
+      ResourceUtilization resourcesToFreeUp) {
     // The opportunistic containers that need to be killed for the
     // given container to start.
     List<Container> extraOpportContainersToKill = new ArrayList<>();
     // Track resources that need to be freed.
-    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
 
     // Go over the running opportunistic containers.
     // Use a descending iterator to kill more recently started containers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 9ad4f91..f76727d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -30,5 +30,7 @@ public enum ContainerSchedulerEventType {
   CONTAINER_PAUSED,
   RECOVERY_COMPLETED,
   // Producer: Containers Monitor when over-allocation is on
-  SCHEDULE_CONTAINERS
+  SCHEDULE_CONTAINERS,
+  // Producer: Containers Monitor when over-allocation is on
+  PREEMPT_CONTAINERS
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java
new file mode 100644
index 0000000..547036a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+
+/**
+ * A {@link ContainerSchedulerEvent} generated by {@link ContainersMonitorImpl}
+ * when overallocation is turned on and the utilization goes over the
+ * proactive preemption thresholds.
+ */
+public class ContainerSchedulerOverallocationPreemptionEvent
+    extends ContainerSchedulerEvent {
+  private final ResourceUtilization resourcesOverPreemptionThresholds;
+  /**
+   * Create instance of Event.
+   *
+   * @param toFree resource to free up.
+   */
+  public ContainerSchedulerOverallocationPreemptionEvent(
+      ResourceUtilization toFree) {
+    super(null, ContainerSchedulerEventType.PREEMPT_CONTAINERS);
+    this.resourcesOverPreemptionThresholds = toFree;
+  }
+
+  public ResourceUtilization getResourcesOverPreemptionThresholds() {
+    return resourcesOverPreemptionThresholds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java
new file mode 100644
index 0000000..61f6298
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+/**
+ * Keeps track of containers utilization over time and determines how many
+ * resources need to be reclaimed by preempting opportunistic containers
+ * when over-allocation is turned on.
+ */
+public abstract class NMAllocationPreemptionPolicy {
+  private final ResourceThresholds overAllocationPreemptionThresholds;
+  private final ContainersMonitor containersMonitor;
+
+  public NMAllocationPreemptionPolicy(
+      ResourceThresholds preemptionThresholds,
+      ContainersMonitor containersMonitor) {
+    this.containersMonitor = containersMonitor;
+    this.overAllocationPreemptionThresholds = preemptionThresholds;
+  }
+
+  /**
+   * Get the amount of resources to reclaim by preempting opportunistic
+   * containers when over-allocation is turned on.
+   * @return the amount of resources to be reclaimed
+   */
+  public abstract ResourceUtilization getResourcesToReclaim();
+
+  public ContainersMonitor getContainersMonitor() {
+    return containersMonitor;
+  }
+
+  public ResourceThresholds getOverAllocationPreemptionThresholds() {
+    return overAllocationPreemptionThresholds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 7a7c78e..9be7574 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -39,6 +39,13 @@ public interface ResourceUtilizationTracker {
   ResourceUtilization getCurrentUtilization();
 
   /**
+   * Get the total amount of resources allocated to running containers
+   * in terms of resource utilization.
+   * @return ResourceUtilization resource allocation
+   */
+  ResourceUtilization getTotalAllocation();
+
+  /**
    * Get the amount of resources currently available to launch containers.
    * @return Resource resources available to launch containers
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java
new file mode 100644
index 0000000..188a108
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+/**
+ * An implementation of {@link NMAllocationPreemptionPolicy} based on the
+ * snapshot of the latest containers utilization to determine how many
+ * resources need to be reclaimed by preempting opportunistic containers
+ * when over-allocation is turned on.
+ */
+public class SnapshotBasedOverAllocationPreemptionPolicy
+    extends NMAllocationPreemptionPolicy {
+  private final int absoluteMemoryPreemptionThresholdMb;
+  private final float cpuPreemptionThreshold;
+  private final int maxTimesCpuOverPreemption;
+  private int timesCpuOverPreemption;
+
+  public SnapshotBasedOverAllocationPreemptionPolicy(
+      ResourceThresholds preemptionThresholds,
+      int timesCpuOverPreemptionThreshold,
+      ContainersMonitor containersMonitor) {
+    super(preemptionThresholds, containersMonitor);
+    int memoryCapacityMb = (int)
+        (containersMonitor.getPmemAllocatedForContainers() / (1024 * 1024));
+    this.absoluteMemoryPreemptionThresholdMb = (int)
+        (preemptionThresholds.getMemoryThreshold() * memoryCapacityMb);
+    this.cpuPreemptionThreshold = preemptionThresholds.getCpuThreshold();
+    this.maxTimesCpuOverPreemption = timesCpuOverPreemptionThreshold;
+  }
+
+  @Override
+  public ResourceUtilization getResourcesToReclaim() {
+    ResourceUtilization utilization =
+        getContainersMonitor().getContainersUtilization(true).getUtilization();
+
+    int memoryOverLimit = utilization.getPhysicalMemory() -
+        absoluteMemoryPreemptionThresholdMb;
+    float vcoreOverLimit = utilization.getCPU() - cpuPreemptionThreshold;
+
+    if (vcoreOverLimit > 0) {
+      timesCpuOverPreemption++;
+      if (timesCpuOverPreemption > maxTimesCpuOverPreemption) {
+        timesCpuOverPreemption = 0;
+      } else {
+        // report no over limit for cpu if # of times CPU is over the preemption
+        // threshold is not greater the max number of times allowed
+        vcoreOverLimit = 0;
+      }
+    } else {
+      // reset the counter when cpu utilization goes under the preemption
+      // threshold before the max times allowed is reached
+      timesCpuOverPreemption = 0;
+    }
+
+    // sanitize so that zero is returned if the utilization is below
+    // the preemption threshold
+    vcoreOverLimit = Math.max(0, vcoreOverLimit);
+    memoryOverLimit = Math.max(0, memoryOverLimit);
+
+    return ResourceUtilization.newInstance(memoryOverLimit, 0, vcoreOverLimit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 92613ed..9f1506a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,7 +26,12 @@ import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ContainerSubState;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -559,4 +564,125 @@ public abstract class BaseContainerManagerTest {
         ContainerId.newContainerId(appAttemptId, cId);
     return containerId;
   }
+
+  /**
+   * A test implementation of {@link ContainersMonitor} that allows control of
+   * the current resource utilization.
+   */
+  protected static class ContainerMonitorForTest
+      extends ContainersMonitorImpl {
+    private static final int NM_CONTAINERS_VCORES = 4;
+    private static final int NM_CONTAINERS_MEMORY_MB = 2048;
+
+    private ResourceUtilization containerResourceUsage =
+        ResourceUtilization.newInstance(0, 0, 0.0f);
+
+    ContainerMonitorForTest(ContainerExecutor exec,
+        AsyncDispatcher dispatcher, Context context) {
+      super(exec, dispatcher, context);
+    }
+
+    @Override
+    public long getPmemAllocatedForContainers() {
+      return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L;
+    }
+
+    @Override
+    public long getVmemAllocatedForContainers() {
+      float pmemRatio = getConfig().getFloat(
+          YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      return (long) (pmemRatio * getPmemAllocatedForContainers());
+    }
+
+    @Override
+    public long getVCoresAllocatedForContainers() {
+      return NM_CONTAINERS_VCORES;
+    }
+
+    @Override
+    public ContainersResourceUtilization getContainersUtilization(
+        boolean latest) {
+      return new ContainersMonitor.ContainersResourceUtilization(
+          containerResourceUsage, Time.now());
+    }
+
+    @Override
+    protected void checkOverAllocationPrerequisites() {
+      // do not check
+    }
+
+    public void setContainerResourceUsage(
+        ResourceUtilization containerResourceUsage) {
+      this.containerResourceUsage = containerResourceUsage;
+    }
+  }
+
+  /**
+   * A test implementation of {@link ContainerManager} that allows its
+   * internal event queue to be drained for synchronization purpose,
+   * and an out-of-band check of the node resource utilization so that
+   * the node utilization can stay between the over-allocation threshold
+   * and the preemption threshold.
+   */
+  protected static class ContainerManagerForTest
+      extends ContainerManagerImpl {
+
+    private final String user;
+
+    public ContainerManagerForTest(
+        Context context, ContainerExecutor exec,
+        DeletionService deletionContext,
+        NodeStatusUpdater nodeStatusUpdater,
+        NodeManagerMetrics metrics,
+        LocalDirsHandlerService dirsHandler, String user) {
+      super(context, exec, deletionContext,
+          nodeStatusUpdater, metrics, dirsHandler);
+      this.user = user;
+    }
+
+    @Override
+    protected UserGroupInformation getRemoteUgi() throws YarnException {
+      ApplicationId appId = ApplicationId.newInstance(0, 0);
+      ApplicationAttemptId appAttemptId =
+          ApplicationAttemptId.newInstance(appId, 1);
+      UserGroupInformation ugi =
+          UserGroupInformation.createRemoteUser(appAttemptId.toString());
+      ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+          .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+          .getKeyId()));
+      return ugi;
+    }
+
+    @Override
+    protected AsyncDispatcher createDispatcher() {
+      return new DrainDispatcher();
+    }
+
+    @Override
+    protected ContainersMonitor createContainersMonitor(
+        ContainerExecutor exec) {
+      return new ContainerMonitorForTest(exec, dispatcher, context);
+    }
+
+    /**
+     * Check the node resource utilization out-of-band. If the utilization is
+     * below the over-allocation threshold, queued OPPORTUNISTIC containers
+     * will be launched to bring the node utilization up to the over-allocation
+     * threshold. If the utilization is above the preemption threshold, running
+     * OPPORTUNISTIC containers will be killed to bring the utilization down to
+     * the preemption threshold.
+     */
+    public void checkNodeResourceUtilization() {
+      ((ContainerMonitorForTest) getContainersMonitor()).checkUtilization();
+      drainAsyncEvents();
+    }
+
+    /**
+     * Drain the internal event queue.
+     */
+    public void drainAsyncEvents() {
+      ((DrainDispatcher) dispatcher).await();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 70066c6..d5c09b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
@@ -38,18 +37,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerSubState;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ConfigurationException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
@@ -65,8 +61,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
@@ -131,47 +125,8 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
   @Override
   protected ContainerManagerImpl createContainerManager(
       DeletionService delSrvc) {
-    return new ContainerManagerImpl(context, exec, delSrvc,
-        nodeStatusUpdater, metrics, dirsHandler) {
-
-      @Override
-      protected UserGroupInformation getRemoteUgi() throws YarnException {
-        ApplicationId appId = ApplicationId.newInstance(0, 0);
-        ApplicationAttemptId appAttemptId =
-            ApplicationAttemptId.newInstance(appId, 1);
-        UserGroupInformation ugi =
-            UserGroupInformation.createRemoteUser(appAttemptId.toString());
-        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
-            .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
-            .getKeyId()));
-        return ugi;
-      }
-
-      @Override
-      protected ContainersMonitor createContainersMonitor(
-          ContainerExecutor exec) {
-        return new ContainersMonitorImpl(exec, dispatcher, this.context) {
-          // Define resources available for containers to be executed.
-          @Override
-          public long getPmemAllocatedForContainers() {
-            return 2048 * 1024 * 1024L;
-          }
-
-          @Override
-          public long getVmemAllocatedForContainers() {
-            float pmemRatio = getConfig().getFloat(
-                YarnConfiguration.NM_VMEM_PMEM_RATIO,
-                YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-            return (long) (pmemRatio * getPmemAllocatedForContainers());
-          }
-
-          @Override
-          public long getVCoresAllocatedForContainers() {
-            return 4;
-          }
-        };
-      }
-    };
+    return new ContainerManagerForTest(context, exec, delSrvc,
+        nodeStatusUpdater, metrics, dirsHandler, user);
   }
 
   @Override
@@ -393,6 +348,37 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
         containerScheduler.getNumQueuedGuaranteedContainers());
     Assert.assertEquals(2,
         containerScheduler.getNumQueuedOpportunisticContainers());
+
+    // we have just one container that requested 2048 MB of memory and 1 vcore
+    // running, its resource utilization is zero.
+    // check the node resource utilization and the two OPPORTUNISTIC containers
+    // that are being queued should stay in the queue because over-allocation
+    // is turn off.
+    ((ContainerMonitorForTest) containerManager.getContainersMonitor())
+        .setContainerResourceUsage(
+            ResourceUtilization.newInstance(0, 0, 0.0f));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    ContainerId containerId0 = createContainerId(0);
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(containerId0)) {
+        Assert.assertEquals(ContainerSubState.RUNNING,
+            status.getContainerSubState());
+      } else {
+        Assert.assertEquals(ContainerSubState.SCHEDULED,
+            status.getContainerSubState());
+      }
+    }
+    containerScheduler = containerManager.getContainerScheduler();
+    // Ensure two containers are properly queued.
+    Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
+    Assert.assertEquals(0,
+        containerScheduler.getNumQueuedGuaranteedContainers());
+    Assert.assertEquals(2,
+        containerScheduler.getNumQueuedOpportunisticContainers());
   }
 
   /**
@@ -476,6 +462,91 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
   }
 
   /**
+   * Start two OPPORTUNISTIC containers which together ask for all
+   * the allocations available on the node. When the node resource
+   * utilization goes over the preemption thresholds, neither of
+   * the containers should be preempted because there is no
+   * over-allocation at the moment and they can safely use up all
+   * the resources availabe on the node.
+   */
+  @Test
+  public void testNoOpportunisticContainerPreemptionUponHighUtilization()
+      throws Exception {
+    containerManager.start();
+
+    // start two OPPORTUNISTIC containers that together takes up
+    // all allocations of the node. They two can be launched immediately
+    // because there is enough free allocation. When they uses up
+    // all their resource allocations, that is, the node is fully
+    // utilized, none of the OPPORTUNISTIC containers shall be killed
+    // because the node is not being over-allocated
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        recordFactory.newRecordInstance(ContainerLaunchContext.class),
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 2),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        recordFactory.newRecordInstance(ContainerLaunchContext.class),
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 2),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    // the two OPPORTUNISTIC containers shall be launched immediately
+    // because there is just enough allocation to launch them both.
+    BaseContainerManagerTest.waitForContainerState(containerManager,
+        createContainerId(0),
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
+    BaseContainerManagerTest.waitForContainerState(containerManager,
+        createContainerId(1),
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
+
+    // Ensure all containers are running.
+    List<ContainerId> statList = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+          status.getState());
+    }
+
+    // we have two containers running, both of which are using all of
+    // their allocations. The node is being fully utilized in terms
+    // of both memory and CPU
+    ((ContainerMonitorForTest) containerManager.getContainersMonitor())
+        .setContainerResourceUsage(
+            ResourceUtilization.newInstance(2048, 0, 1.0f));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // the two running OPPORTUNISTIC containers shall continue to run
+    // because when the node is not be over-allocated, it is safe to
+    // let the containers use up all the resources, no OPPORTUNISTIC
+    // containers shall be preempted
+    containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+          status.getState());
+    }
+  }
+
+  /**
    * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
    * requests by each container as such that only one can run in parallel.
    * Thus, the OPPORTUNISTIC container that started running, will be


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org