You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:08:02 UTC

[40/50] [abbrv] hadoop git commit: YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen.

YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08116cc4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08116cc4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08116cc4

Branch: refs/heads/YARN-1011
Commit: 08116cc419126cfecb886aab62642de817a4b885
Parents: 0067800
Author: Miklos Szegedi <sz...@apache.org>
Authored: Fri Apr 20 14:15:29 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Sep 21 17:19:25 2018 -0700

----------------------------------------------------------------------
 .../nodemanager/NodeStatusUpdaterImpl.java      |    2 +-
 .../containermanager/ContainerManagerImpl.java  |    8 +-
 .../launcher/ContainerLaunch.java               |    2 +-
 .../launcher/ContainersLauncher.java            |    9 +-
 .../monitor/ContainersMonitor.java              |   38 +-
 .../monitor/ContainersMonitorImpl.java          |   56 +-
 .../AllocationBasedResourceTracker.java         |  114 ++
 ...locationBasedResourceUtilizationTracker.java |  158 ---
 .../scheduler/ContainerScheduler.java           |  317 +++--
 .../scheduler/ContainerSchedulerEventType.java  |    4 +-
 .../scheduler/NMAllocationPolicy.java           |   63 +
 .../scheduler/ResourceUtilizationTracker.java   |   17 +-
 .../SnapshotBasedOverAllocationPolicy.java      |   54 +
 .../UtilizationBasedResourceTracker.java        |   95 ++
 .../BaseContainerManagerTest.java               |   35 +
 .../TestContainersMonitorResourceChange.java    |    9 +-
 .../TestAllocationBasedResourceTracker.java     |   82 ++
 ...locationBasedResourceUtilizationTracker.java |   93 --
 .../TestContainerSchedulerRecovery.java         |   58 +-
 ...estContainerSchedulerWithOverAllocation.java | 1121 ++++++++++++++++++
 20 files changed, 1916 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 572684e..d757376 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -543,7 +543,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private ResourceUtilization getContainersUtilization() {
     ContainersMonitor containersMonitor =
         this.context.getContainerManager().getContainersMonitor();
-    return containersMonitor.getContainersUtilization();
+    return containersMonitor.getContainersUtilization(false).getUtilization();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 27a7c80..a08e227 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -245,6 +245,12 @@ public class ContainerManagerImpl extends CompositeService implements
             metrics);
     addService(rsrcLocalizationSrvc);
 
+    this.containersMonitor = createContainersMonitor(exec);
+    addService(this.containersMonitor);
+
+    // ContainersLauncher must be added after ContainersMonitor
+    // because the former depends on the latter to initialize
+    // over-allocation first.
     containersLauncher = createContainersLauncher(context, exec);
     addService(containersLauncher);
 
@@ -269,8 +275,6 @@ public class ContainerManagerImpl extends CompositeService implements
       nmMetricsPublisher = createNMTimelinePublisher(context);
       context.setNMTimelinePublisher(nmMetricsPublisher);
     }
-    this.containersMonitor = createContainersMonitor(exec);
-    addService(this.containersMonitor);
 
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 6347d4e..0228332 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1079,7 +1079,7 @@ public class ContainerLaunch implements Callable<Integer> {
    * @return Process ID
    * @throws Exception
    */
-  private String getContainerPid(Path pidFilePath) throws Exception {
+  protected String getContainerPid(Path pidFilePath) throws Exception {
     String containerIdStr = 
         container.getContainerId().toString();
     String processId = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 7870f86..2f5acfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -121,8 +121,7 @@ public class ContainersLauncher extends AbstractService
               containerId.getApplicationAttemptId().getApplicationId());
 
         ContainerLaunch launch =
-            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
-              event.getContainer(), dirsHandler, containerManager);
+            createContainerLaunch(app, event.getContainer());
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
@@ -225,4 +224,10 @@ public class ContainersLauncher extends AbstractService
         break;
     }
   }
+
+  protected ContainerLaunch createContainerLaunch(
+      Application app, Container container) {
+    return new ContainerLaunch(context, getConfig(), dispatcher,
+        exec, app, container, dirsHandler, containerManager);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 64831e9..8da4ec4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -23,10 +23,24 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
 
 public interface ContainersMonitor extends Service,
     EventHandler<ContainersMonitorEvent>, ResourceView {
-  ResourceUtilization getContainersUtilization();
+
+  /**
+   * Get the aggregate resource utilization of containers running on the node,
+   * with a timestamp of the measurement.
+   * @param latest true if the latest result should be returned
+   * @return ResourceUtilization resource utilization of all containers
+   */
+  ContainersResourceUtilization getContainersUtilization(boolean latest);
+
+  /**
+   * Get the policy to over-allocate containers when over-allocation is on.
+   * @return null if over-allocation is turned off
+   */
+  NMAllocationPolicy getContainerOverAllocationPolicy();
 
   float getVmemRatio();
 
@@ -66,4 +80,26 @@ public interface ContainersMonitor extends Service,
         * containersMonitor.getVmemRatio());
     resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
   }
+
+  /**
+   * A snapshot of resource utilization of all containers with the timestamp.
+   */
+  final class ContainersResourceUtilization {
+    private final ResourceUtilization utilization;
+    private final long timestamp;
+
+    public ContainersResourceUtilization(
+        ResourceUtilization utilization, long timestamp) {
+      this.utilization = utilization;
+      this.timestamp = timestamp;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public ResourceUtilization getUtilization() {
+      return utilization;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 7873882..a045d78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,8 +116,9 @@ public class ContainersMonitorImpl extends AbstractService implements
     CPU, MEMORY
   }
 
-  private ResourceUtilization containersUtilization;
+  private ContainersResourceUtilization latestContainersUtilization;
 
+  private NMAllocationPolicy overAllocationPolicy;
   private ResourceThresholds overAllocationPreemptionThresholds;
   private int overAlloctionPreemptionCpuCount = -1;
 
@@ -129,7 +134,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     this.monitoringThread = new MonitoringThread();
 
-    this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.latestContainersUtilization = new ContainersResourceUtilization(
+        ResourceUtilization.newInstance(-1, -1, -1.0f), -1L);
   }
 
   @Override
@@ -365,6 +371,10 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance(
         cpuPreemptionThreshold, memoryPreemptionThreshold);
 
+    // TODO make this configurable
+    this.overAllocationPolicy =
+        createOverAllocationPolicy(resourceThresholds);
+
     LOG.info("NodeManager oversubscription enabled with overallocation " +
         "thresholds (memory:" + overAllocationMemoryUtilizationThreshold +
         ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" +
@@ -372,6 +382,11 @@ public class ContainersMonitorImpl extends AbstractService implements
         cpuPreemptionThreshold + ")");
   }
 
+  protected NMAllocationPolicy createOverAllocationPolicy(
+      ResourceThresholds resourceThresholds) {
+    return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this);
+  }
+
   private boolean isResourceCalculatorAvailable() {
     if (resourceCalculatorPlugin == null) {
       LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
@@ -655,7 +670,12 @@ public class ContainersMonitorImpl extends AbstractService implements
         }
 
         // Save the aggregated utilization of the containers
-        setContainersUtilization(trackedContainersUtilization);
+        setLatestContainersUtilization(trackedContainersUtilization);
+
+        // check opportunity to start containers if over-allocation is on
+        if (context.isOverAllocationEnabled()) {
+          attemptToStartContainersUponLowUtilization();
+        }
 
         // Publish the container utilization metrics to node manager
         // metrics system.
@@ -1045,12 +1065,34 @@ public class ContainersMonitorImpl extends AbstractService implements
   }
 
   @Override
-  public ResourceUtilization getContainersUtilization() {
-    return this.containersUtilization;
+  public ContainersResourceUtilization getContainersUtilization(
+      boolean latest) {
+    // TODO update containerUtilization if latest is true
+    return this.latestContainersUtilization;
+  }
+
+  @Override
+  public NMAllocationPolicy getContainerOverAllocationPolicy() {
+    return overAllocationPolicy;
+  }
+
+  private void setLatestContainersUtilization(ResourceUtilization utilization) {
+    this.latestContainersUtilization = new ContainersResourceUtilization(
+        utilization, System.currentTimeMillis());
   }
 
-  private void setContainersUtilization(ResourceUtilization utilization) {
-    this.containersUtilization = utilization;
+  @VisibleForTesting
+  public void attemptToStartContainersUponLowUtilization() {
+    if (getContainerOverAllocationPolicy() != null) {
+      Resource available = getContainerOverAllocationPolicy()
+          .getAvailableResources();
+      if (available.getMemorySize() > 0 &&
+          available.getVirtualCores() > 0) {
+        eventDispatcher.getEventHandler().handle(
+            new ContainerSchedulerEvent(null,
+                ContainerSchedulerEventType.SCHEDULE_CONTAINERS));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
new file mode 100644
index 0000000..86b3698
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the resource utilization tracker that equates
+ * resource utilization with the total resource allocated to the container.
+ */
+public class AllocationBasedResourceTracker
+    implements ResourceUtilizationTracker {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
+
+  private static final Resource UNAVAILABLE =
+      Resource.newInstance(0, 0);
+
+  private ResourceUtilization containersAllocation;
+  private ContainerScheduler scheduler;
+
+
+  AllocationBasedResourceTracker(ContainerScheduler scheduler) {
+    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.scheduler = scheduler;
+  }
+
+  /**
+   * Get the accumulation of totally allocated resources to containers.
+   * @return ResourceUtilization Resource Utilization.
+   */
+  @Override
+  public ResourceUtilization getCurrentUtilization() {
+    return this.containersAllocation;
+  }
+
+  /**
+   * Get the amount of resources that have not been allocated to containers.
+   * @return Resource resources that have not been allocated to containers.
+   */
+  protected Resource getUnallocatedResources() {
+    // unallocated resources = node capacity - containers allocation
+    // = -(container allocation - node capacity)
+    ResourceUtilization allocationClone =
+        ResourceUtilization.newInstance(containersAllocation);
+    getContainersMonitor()
+        .subtractNodeResourcesFromResourceUtilization(allocationClone);
+
+    Resource unallocated = UNAVAILABLE;
+    if (allocationClone.getCPU() <= 0 &&
+        allocationClone.getPhysicalMemory() <= 0 &&
+        allocationClone.getVirtualMemory() <= 0) {
+      int cpu = Math.round(allocationClone.getCPU() *
+          getContainersMonitor().getVCoresAllocatedForContainers());
+      long memory = allocationClone.getPhysicalMemory();
+      unallocated = Resource.newInstance(-memory, -cpu);
+    }
+    return unallocated;
+  }
+
+
+  @Override
+  public Resource getAvailableResources() {
+    return getUnallocatedResources();
+  }
+
+  /**
+   * Add Container's resources to the accumulated allocation.
+   * @param container Container.
+   */
+  @Override
+  public void containerLaunched(Container container) {
+    ContainersMonitor.increaseResourceUtilization(
+        getContainersMonitor(), this.containersAllocation,
+        container.getResource());
+  }
+
+  /**
+   * Subtract Container's resources to the accumulated allocation.
+   * @param container Container.
+   */
+  @Override
+  public void containerReleased(Container container) {
+    ContainersMonitor.decreaseResourceUtilization(
+        getContainersMonitor(), this.containersAllocation,
+        container.getResource());
+  }
+
+  public ContainersMonitor getContainersMonitor() {
+    return this.scheduler.getContainersMonitor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
deleted file mode 100644
index 6e2b617..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the {@link ResourceUtilizationTracker} that equates
- * resource utilization with the total resource allocated to the container.
- */
-public class AllocationBasedResourceUtilizationTracker implements
-    ResourceUtilizationTracker {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
-
-  private ResourceUtilization containersAllocation;
-  private ContainerScheduler scheduler;
-
-  AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
-    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
-    this.scheduler = scheduler;
-  }
-
-  /**
-   * Get the accumulation of totally allocated resources to a container.
-   * @return ResourceUtilization Resource Utilization.
-   */
-  @Override
-  public ResourceUtilization getCurrentUtilization() {
-    return this.containersAllocation;
-  }
-
-  /**
-   * Add Container's resources to the accumulated Utilization.
-   * @param container Container.
-   */
-  @Override
-  public void addContainerResources(Container container) {
-    ContainersMonitor.increaseResourceUtilization(
-        getContainersMonitor(), this.containersAllocation,
-        container.getResource());
-  }
-
-  /**
-   * Subtract Container's resources to the accumulated Utilization.
-   * @param container Container.
-   */
-  @Override
-  public void subtractContainerResource(Container container) {
-    ContainersMonitor.decreaseResourceUtilization(
-        getContainersMonitor(), this.containersAllocation,
-        container.getResource());
-  }
-
-  /**
-   * Check if NM has resources available currently to run the container.
-   * @param container Container.
-   * @return True, if NM has resources available currently to run the container.
-   */
-  @Override
-  public boolean hasResourcesAvailable(Container container) {
-    long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
-    return hasResourcesAvailable(pMemBytes,
-        (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
-        container.getResource().getVirtualCores());
-  }
-
-  private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
-      int cpuVcores) {
-    // Check physical memory.
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
-          this.containersAllocation.getPhysicalMemory(),
-          (pMemBytes >> 20),
-          (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
-    }
-    if (this.containersAllocation.getPhysicalMemory() +
-        (int) (pMemBytes >> 20) >
-        (int) (getContainersMonitor()
-            .getPmemAllocatedForContainers() >> 20)) {
-      return false;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("before vMemCheck" +
-              "[isEnabled={}, current={} + asked={} > allowed={}]",
-          getContainersMonitor().isVmemCheckEnabled(),
-          this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
-          (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
-    }
-    // Check virtual memory.
-    if (getContainersMonitor().isVmemCheckEnabled() &&
-        this.containersAllocation.getVirtualMemory() +
-            (int) (vMemBytes >> 20) >
-            (int) (getContainersMonitor()
-                .getVmemAllocatedForContainers() >> 20)) {
-      return false;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("before cpuCheck [asked={} > allowed={}]",
-          this.containersAllocation.getCPU(),
-          getContainersMonitor().getVCoresAllocatedForContainers());
-    }
-    // Check CPU. Compare using integral values of cores to avoid decimal
-    // inaccuracies.
-    if (!hasEnoughCpu(this.containersAllocation.getCPU(),
-        getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Returns whether there is enough space for coresRequested in totalCores.
-   * Converts currentAllocation usage to nearest integer count before comparing,
-   * as floats are inherently imprecise. NOTE: this calculation assumes that
-   * requested core counts must be integers, and currentAllocation core count
-   * must also be an integer.
-   *
-   * @param currentAllocation The current allocation, a float value from 0 to 1.
-   * @param totalCores The total cores in the system.
-   * @param coresRequested The number of cores requested.
-   * @return True if currentAllocationtotalCores*coresRequested &lt;=
-   *         totalCores.
-   */
-  public boolean hasEnoughCpu(float currentAllocation, long totalCores,
-      int coresRequested) {
-    // Must not cast here, as it would truncate the decimal digits.
-    return Math.round(currentAllocation * totalCores)
-        + coresRequested <= totalCores;
-  }
-
-  public ContainersMonitor getContainersMonitor() {
-    return this.scheduler.getContainersMonitor();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index a61b9d1..0bebe44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
         .RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +82,10 @@ public class ContainerScheduler extends AbstractService implements
   // Queue of Guaranteed Containers waiting for resources to run
   private final LinkedHashMap<ContainerId, Container>
       queuedGuaranteedContainers = new LinkedHashMap<>();
+  // sum of the resources requested by guaranteed containers in queue
+  private final Resource guaranteedResourcesDemanded =
+      Resource.newInstance(0, 0);
+
   // Queue of Opportunistic Containers waiting for resources to run
   private final LinkedHashMap<ContainerId, Container>
       queuedOpportunisticContainers = new LinkedHashMap<>();
@@ -88,6 +94,10 @@ public class ContainerScheduler extends AbstractService implements
   // or paused to make room for a guaranteed container.
   private final Map<ContainerId, Container> oppContainersToKill =
       new HashMap<>();
+  // sum of the resources to be released by opportunistic containers that
+  // have been marked to be killed or paused.
+  private final Resource opportunisticResourcesToBeReleased =
+      Resource.newInstance(0, 0);
 
   // Containers launched by the Scheduler will take a while to actually
   // move to the RUNNING state, but should still be fair game for killing
@@ -128,6 +138,17 @@ public class ContainerScheduler extends AbstractService implements
             DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
   }
 
+  @VisibleForTesting
+  public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+      NodeManagerMetrics metrics, int qLength) {
+    super(ContainerScheduler.class.getName());
+    this.context = context;
+    this.dispatcher = dispatcher;
+    this.metrics = metrics;
+    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+    this.opportunisticContainersStatus =
+        OpportunisticContainersStatus.newInstance();
+  }
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
@@ -155,20 +176,16 @@ public class ContainerScheduler extends AbstractService implements
             YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
             YarnConfiguration.
                 DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
+    // We assume over allocation configurations have been initialized
+    this.utilizationTracker = getResourceTracker();
   }
 
-  @VisibleForTesting
-  public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
-      NodeManagerMetrics metrics, int qLength) {
-    super(ContainerScheduler.class.getName());
-    this.context = context;
-    this.dispatcher = dispatcher;
-    this.metrics = metrics;
-    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
-    this.utilizationTracker =
-        new AllocationBasedResourceUtilizationTracker(this);
-    this.opportunisticContainersStatus =
-        OpportunisticContainersStatus.newInstance();
+  private AllocationBasedResourceTracker getResourceTracker() {
+    if (context.isOverAllocationEnabled()) {
+      return new UtilizationBasedResourceTracker(this);
+    } else {
+      return new AllocationBasedResourceTracker(this);
+    }
   }
 
   /**
@@ -191,14 +208,18 @@ public class ContainerScheduler extends AbstractService implements
       if (event instanceof UpdateContainerSchedulerEvent) {
         onUpdateContainer((UpdateContainerSchedulerEvent) event);
       } else {
-        LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
+        LOG.error("Unknown event type on UpdateContainer: " + event.getType());
       }
       break;
     case SHED_QUEUED_CONTAINERS:
       shedQueuedOpportunisticContainers();
       break;
     case RECOVERY_COMPLETED:
-      startPendingContainers(maxOppQueueLength <= 0);
+      startPendingContainers(false);
+      break;
+    case SCHEDULE_CONTAINERS:
+      startPendingContainers(true);
+      break;
     default:
       LOG.error("Unknown event arrived at ContainerScheduler: "
           + event.toString());
@@ -213,10 +234,10 @@ public class ContainerScheduler extends AbstractService implements
     ContainerId containerId = updateEvent.getContainer().getContainerId();
     if (updateEvent.isResourceChange()) {
       if (runningContainers.containsKey(containerId)) {
-        this.utilizationTracker.subtractContainerResource(
+        this.utilizationTracker.containerReleased(
             new ContainerImpl(getConfig(), null, null, null, null,
                 updateEvent.getOriginalToken(), context));
-        this.utilizationTracker.addContainerResources(
+        this.utilizationTracker.containerLaunched(
             updateEvent.getContainer());
         getContainersMonitor().handle(
             new ChangeMonitoringContainerResourceEvent(containerId,
@@ -232,17 +253,20 @@ public class ContainerScheduler extends AbstractService implements
         if (queuedOpportunisticContainers.remove(containerId) != null) {
           queuedGuaranteedContainers.put(containerId,
               updateEvent.getContainer());
-          //Kill/pause opportunistic containers if any to make room for
-          // promotion request
-          reclaimOpportunisticContainerResources(updateEvent.getContainer());
+          Resources.addTo(guaranteedResourcesDemanded,
+              updateEvent.getContainer().getResource());
+          startPendingContainers(true);
         }
       } else {
         // Demotion of queued container.. Should not happen too often
         // since you should not find too many queued guaranteed
         // containers
         if (queuedGuaranteedContainers.remove(containerId) != null) {
+          Resources.subtractFrom(guaranteedResourcesDemanded,
+              updateEvent.getContainer().getResource());
           queuedOpportunisticContainers.put(containerId,
               updateEvent.getContainer());
+          startPendingContainers(false);
         }
       }
       try {
@@ -269,6 +293,7 @@ public class ContainerScheduler extends AbstractService implements
         || rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
       if (execType == ExecutionType.GUARANTEED) {
         queuedGuaranteedContainers.put(container.getContainerId(), container);
+        Resources.addTo(guaranteedResourcesDemanded, container.getResource());
       } else if (execType == ExecutionType.OPPORTUNISTIC) {
         queuedOpportunisticContainers
             .put(container.getContainerId(), container);
@@ -279,7 +304,7 @@ public class ContainerScheduler extends AbstractService implements
       }
     } else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
       runningContainers.put(container.getContainerId(), container);
-      utilizationTracker.addContainerResources(container);
+      utilizationTracker.containerLaunched(container);
     }
     if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
             && rcs.getCapability() != null) {
@@ -344,65 +369,107 @@ public class ContainerScheduler extends AbstractService implements
   }
 
   private void onResourcesReclaimed(Container container) {
-    oppContainersToKill.remove(container.getContainerId());
+    ContainerId containerId = container.getContainerId();
+
+    // This could be killed externally for eg. by the ContainerManager,
+    // in which case, the container might still be queued.
+    if (queuedOpportunisticContainers.remove(containerId) != null) {
+      return;
+    }
 
     // This could be killed externally for eg. by the ContainerManager,
     // in which case, the container might still be queued.
-    Container queued =
-        queuedOpportunisticContainers.remove(container.getContainerId());
-    if (queued == null) {
-      queuedGuaranteedContainers.remove(container.getContainerId());
+    if (queuedGuaranteedContainers.remove(containerId) != null) {
+      Resources.addTo(guaranteedResourcesDemanded, container.getResource());
+      return;
+    }
+
+    if (oppContainersToKill.remove(containerId) != null) {
+      Resources.subtractFrom(
+          opportunisticResourcesToBeReleased, container.getResource());
     }
 
     // Requeue PAUSED containers
     if (container.getContainerState() == ContainerState.PAUSED) {
       if (container.getContainerTokenIdentifier().getExecutionType() ==
           ExecutionType.GUARANTEED) {
-        queuedGuaranteedContainers.put(container.getContainerId(), container);
+        queuedGuaranteedContainers.put(containerId, container);
+        Resources.addTo(guaranteedResourcesDemanded, container.getResource());
       } else {
-        queuedOpportunisticContainers.put(
-            container.getContainerId(), container);
+        queuedOpportunisticContainers.put(containerId, container);
       }
     }
     // decrement only if it was a running container
-    Container completedContainer = runningContainers.remove(container
-        .getContainerId());
+    Container completedContainer = runningContainers.remove(containerId);
     // only a running container releases resources upon completion
     boolean resourceReleased = completedContainer != null;
     if (resourceReleased) {
-      this.utilizationTracker.subtractContainerResource(container);
+      this.utilizationTracker.containerReleased(container);
       if (container.getContainerTokenIdentifier().getExecutionType() ==
           ExecutionType.OPPORTUNISTIC) {
         this.metrics.completeOpportunisticContainer(container.getResource());
       }
-      boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
-      startPendingContainers(forceStartGuaranteedContainers);
+
+      // In case of over-allocation being turned on, we may need to reclaim
+      // more resources since the opportunistic containers that have been
+      // killed or paused may have not released as much resource as we need.
+      boolean reclaimOpportunisticResources = context.isOverAllocationEnabled();
+      startPendingContainers(reclaimOpportunisticResources);
     }
   }
 
   /**
    * Start pending containers in the queue.
-   * @param forceStartGuaranteedContaieners When this is true, start guaranteed
-   *        container without looking at available resource
+   * @param reclaimOpportunisticResources if set to true, resources allocated
+   *                  to running OPPORTUNISTIC containers will be reclaimed in
+   *                  cases where there are GUARANTEED containers being queued
    */
-  private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
-    // Start guaranteed containers that are paused, if resources available.
-    boolean resourcesAvailable = startContainers(
-          queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
-    // Start opportunistic containers, if resources available.
-    if (resourcesAvailable) {
-      startContainers(queuedOpportunisticContainers.values(), false);
+  private void startPendingContainers(boolean reclaimOpportunisticResources) {
+    // When opportunistic container not allowed (which is determined by
+    // max-queue length of pending opportunistic containers <= 0), start
+    // guaranteed containers without looking at available resources and
+    // skip scanning the queue of opportunistic containers
+    if (maxOppQueueLength <= 0) {
+      forcefullyStartGuaranteedContainers();
+      return;
+    }
+
+    Resource available = utilizationTracker.getAvailableResources();
+
+    // Start guaranteed containers that are queued, if resources available.
+    boolean allGuaranteedContainersLaunched =
+        startGuaranteedContainers(available);
+    // Start opportunistic containers, if resources available, which is true
+    // if all guaranteed containers in queue have been launched.
+    if (allGuaranteedContainersLaunched) {
+      startOpportunisticContainers(available);
+    } else {
+      // If not all guaranteed containers in queue are launched, we may need
+      // to reclaim resources from opportunistic containers that are running.
+      if (reclaimOpportunisticResources) {
+        reclaimOpportunisticContainerResources();
+      }
     }
   }
 
-  private boolean startContainers(
-      Collection<Container> containersToBeStarted, boolean force) {
-    Iterator<Container> cIter = containersToBeStarted.iterator();
+  /**
+   * Try to launch as many GUARANTEED containers as possible.
+   * @param available the amount of resources available to launch containers
+   * @return true if all queued GUARANTEED containers are launched
+   *              or there is no GUARANTEED containers to launch
+   */
+  private boolean startGuaranteedContainers(Resource available) {
+    Iterator<Container> cIter =
+        queuedGuaranteedContainers.values().iterator();
     boolean resourcesAvailable = true;
     while (cIter.hasNext() && resourcesAvailable) {
       Container container = cIter.next();
-      if (tryStartContainer(container, force)) {
+      if (isResourceAvailable(available, container)) {
+        startContainer(container);
+        Resources.subtractFrom(available, container.getResource());
         cIter.remove();
+        Resources.subtractFrom(
+            guaranteedResourcesDemanded, container.getResource());
       } else {
         resourcesAvailable = false;
       }
@@ -410,25 +477,49 @@ public class ContainerScheduler extends AbstractService implements
     return resourcesAvailable;
   }
 
-  private boolean tryStartContainer(Container container, boolean force) {
-    boolean containerStarted = false;
-    // call startContainer without checking available resource when force==true
-    if (force || resourceAvailableToStartContainer(
-        container)) {
+  /**
+   * Launch all queued GUARANTEED containers without checking resource
+   * availability. This is an optimization in cases where OPPORTUNISTIC
+   * containers are not allowed on the node.
+   */
+  private void forcefullyStartGuaranteedContainers() {
+    Iterator<Container> cIter =
+        queuedGuaranteedContainers.values().iterator();
+    while (cIter.hasNext()) {
+      Container container = cIter.next();
       startContainer(container);
-      containerStarted = true;
+      cIter.remove();
+      Resources.subtractFrom(
+          guaranteedResourcesDemanded, container.getResource());
     }
-    return containerStarted;
   }
-
   /**
-   * Check if there is resource available to start a given container
-   * immediately. (This can be extended to include overallocated resources)
-   * @param container the container to start
-   * @return true if container can be launched directly
+   * Try to launch as many OPPORTUNISTIC containers as possible.
+   * @param available the amount of resources available to launch containers
+   * @return true if all OPPORTUNISTIC containers are launched
+   *              or there is no OPPORTUNISTIC containers to launch
    */
-  private boolean resourceAvailableToStartContainer(Container container) {
-    return this.utilizationTracker.hasResourcesAvailable(container);
+  private boolean startOpportunisticContainers(Resource available) {
+    Iterator<Container> cIter =
+        queuedOpportunisticContainers.values().iterator();
+    boolean resourcesAvailable = true;
+    while (cIter.hasNext() && resourcesAvailable) {
+      Container container = cIter.next();
+      if (isResourceAvailable(available, container)) {
+        startContainer(container);
+        Resources.subtractFrom(available, container.getResource());
+        cIter.remove();
+      } else {
+        resourcesAvailable = false;
+      }
+    }
+    return resourcesAvailable;
+  }
+
+  private static boolean isResourceAvailable(
+      Resource resource, Container container) {
+    Resource left = Resources.subtract(resource, container.getResource());
+    return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0;
   }
 
   private boolean enqueueContainer(Container container) {
@@ -438,6 +529,7 @@ public class ContainerScheduler extends AbstractService implements
     boolean isQueued;
     if (isGuaranteedContainer) {
       queuedGuaranteedContainers.put(container.getContainerId(), container);
+      Resources.addTo(guaranteedResourcesDemanded, container.getResource());
       isQueued = true;
     } else {
       if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
@@ -482,18 +574,7 @@ public class ContainerScheduler extends AbstractService implements
     // enough number of opportunistic containers.
     if (isGuaranteedContainer) {
       enqueueContainer(container);
-
-      // When opportunistic container not allowed (which is determined by
-      // max-queue length of pending opportunistic containers <= 0), start
-      // guaranteed containers without looking at available resources.
-      boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
-      startPendingContainers(forceStartGuaranteedContainers);
-
-      // if the guaranteed container is queued, we need to preempt opportunistic
-      // containers for make room for it
-      if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
-        reclaimOpportunisticContainerResources(container);
-      }
+      startPendingContainers(true);
     } else {
       // Given an opportunistic container, we first try to start as many queuing
       // guaranteed containers as possible followed by queuing opportunistic
@@ -511,19 +592,19 @@ public class ContainerScheduler extends AbstractService implements
   }
 
   @SuppressWarnings("unchecked")
-  private void reclaimOpportunisticContainerResources(Container container) {
+  private void reclaimOpportunisticContainerResources() {
     List<Container> extraOppContainersToReclaim =
-        pickOpportunisticContainersToReclaimResources(
-            container.getContainerId());
-    // Kill the opportunistic containers that were chosen.
-    for (Container contToReclaim : extraOppContainersToReclaim) {
+        pickOpportunisticContainersToReclaimResources();
+    killOpportunisticContainers(extraOppContainersToReclaim);
+  }
+
+  private void killOpportunisticContainers(
+      Collection<Container> containersToReclaim) {
+    for (Container contToReclaim : containersToReclaim) {
       String preemptionAction = usePauseEventForPreemption == true ? "paused" :
-          "resumed";
-      LOG.info(
-          "Container {} will be {} to start the "
-              + "execution of guaranteed container {}.",
-          contToReclaim.getContainerId(), preemptionAction,
-          container.getContainerId());
+          "preempted";
+      LOG.info("Container {} will be {} to start the execution of guaranteed" +
+              " containers.", contToReclaim.getContainerId(), preemptionAction);
 
       if (usePauseEventForPreemption) {
         contToReclaim.sendPauseEvent(
@@ -534,6 +615,8 @@ public class ContainerScheduler extends AbstractService implements
             "Container Killed to make room for Guaranteed Container.");
       }
       oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
+      Resources.addTo(
+          opportunisticResourcesToBeReleased, contToReclaim.getResource());
     }
   }
 
@@ -542,7 +625,7 @@ public class ContainerScheduler extends AbstractService implements
     // Skip to put into runningContainers and addUtilization when recover
     if (!runningContainers.containsKey(container.getContainerId())) {
       runningContainers.put(container.getContainerId(), container);
-      this.utilizationTracker.addContainerResources(container);
+      this.utilizationTracker.containerLaunched(container);
     }
     if (container.getContainerTokenIdentifier().getExecutionType() ==
         ExecutionType.OPPORTUNISTIC) {
@@ -551,14 +634,12 @@ public class ContainerScheduler extends AbstractService implements
     container.sendLaunchEvent();
   }
 
-  private List<Container> pickOpportunisticContainersToReclaimResources(
-      ContainerId containerToStartId) {
+  private List<Container> pickOpportunisticContainersToReclaimResources() {
     // The opportunistic containers that need to be killed for the
     // given container to start.
     List<Container> extraOpportContainersToKill = new ArrayList<>();
     // Track resources that need to be freed.
-    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
-        containerToStartId);
+    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
 
     // Go over the running opportunistic containers.
     // Use a descending iterator to kill more recently started containers.
@@ -577,15 +658,19 @@ public class ContainerScheduler extends AbstractService implements
           continue;
         }
         extraOpportContainersToKill.add(runningCont);
+        // In the case of over-allocation, the running container may not
+        // release as much resources as it has requested, but we'll check
+        // again if more containers need to be killed/paused when this
+        // container is released.
         ContainersMonitor.decreaseResourceUtilization(
             getContainersMonitor(), resourcesToFreeUp,
             runningCont.getResource());
       }
     }
     if (!hasSufficientResources(resourcesToFreeUp)) {
-      LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
-          "at the moment. Opportunistic containers are in the process of" +
-          "being killed to make room.", containerToStartId);
+      LOG.warn("There are no sufficient resources to start guaranteed" +
+          " containers at the moment. Opportunistic containers are in" +
+          " the process of being killed to make room.");
     }
     return extraOpportContainersToKill;
   }
@@ -600,34 +685,42 @@ public class ContainerScheduler extends AbstractService implements
             * getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
   }
 
-  private ResourceUtilization resourcesToFreeUp(
-      ContainerId containerToStartId) {
+  /**
+   * Determine how much resources are needed to be freed up to launch the given
+   * GUARANTEED container. Used to determine how many running OPPORTUNISTIC
+   * containers need to be killed/paused, assuming OPPORTUNISTIC containers to
+   * be killed/paused will release the amount of resources they have requested.
+   *
+   * If the node is over-allocating itself, this may cause not enough
+   * OPPORTUNISTIC containers being killed/paused in cases where the running
+   * OPPORTUNISTIC containers are not consuming fully their resource requests.
+   * We'd check again upon container completion events to see if more running
+   * OPPORTUNISTIC containers need to be killed/paused.
+   *
+   * @return the amount of resource needed to be reclaimed for this container
+   */
+  private ResourceUtilization resourcesToFreeUp() {
     // Get allocation of currently allocated containers.
     ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
-        .newInstance(this.utilizationTracker.getCurrentUtilization());
-
-    // Add to the allocation the allocation of the pending guaranteed
-    // containers that will start before the current container will be started.
-    for (Container container : queuedGuaranteedContainers.values()) {
-      ContainersMonitor.increaseResourceUtilization(
-          getContainersMonitor(), resourceAllocationToFreeUp,
-          container.getResource());
-      if (container.getContainerId().equals(containerToStartId)) {
-        break;
-      }
-    }
+        .newInstance(0, 0, 0.0f);
+
+    // Add to the allocation the allocation of pending guaranteed containers.
+    ContainersMonitor.increaseResourceUtilization(getContainersMonitor(),
+        resourceAllocationToFreeUp, guaranteedResourcesDemanded);
 
     // These resources are being freed, likely at the behest of another
     // guaranteed container..
-    for (Container container : oppContainersToKill.values()) {
-      ContainersMonitor.decreaseResourceUtilization(
-          getContainersMonitor(), resourceAllocationToFreeUp,
-          container.getResource());
+    ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
+        resourceAllocationToFreeUp, opportunisticResourcesToBeReleased);
+
+    // Deduct any remaining resources available
+    Resource availableResources = utilizationTracker.getAvailableResources();
+    if (availableResources.getVirtualCores() > 0 &&
+        availableResources.getMemorySize() > 0) {
+      ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
+          resourceAllocationToFreeUp, availableResources);
     }
 
-    // Subtract the overall node resources.
-    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
-        resourceAllocationToFreeUp);
     return resourceAllocationToFreeUp;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 294eddf..9ad4f91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -28,5 +28,7 @@ public enum ContainerSchedulerEventType {
   // Producer: Node HB response - RM has asked to shed the queue
   SHED_QUEUED_CONTAINERS,
   CONTAINER_PAUSED,
-  RECOVERY_COMPLETED
+  RECOVERY_COMPLETED,
+  // Producer: Containers Monitor when over-allocation is on
+  SCHEDULE_CONTAINERS
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
new file mode 100644
index 0000000..58b73d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+/**
+ * Keeps track of containers utilization over time and determines how much
+ * resources are available to launch containers when over-allocation is on.
+ */
+public abstract class NMAllocationPolicy {
+  protected final ResourceThresholds overAllocationThresholds;
+  protected final ContainersMonitor containersMonitor;
+
+  public NMAllocationPolicy(
+      ResourceThresholds overAllocationThresholds,
+      ContainersMonitor containersMonitor) {
+    this.containersMonitor = containersMonitor;
+    this.overAllocationThresholds = overAllocationThresholds;
+  }
+
+  /**
+   * Handle container launch events.
+   * @param container the container that has been launched
+   */
+  public void containerLaunched(Container container) {
+
+  }
+
+  /**
+   * Handle container release events.
+   * @param container the container that has been released
+   */
+  public void containerReleased(Container container) {
+
+  }
+
+  /**
+   * Get the amount of resources to launch containers when
+   * over-allocation is turned on.
+   * @return the amount of resources available to launch containers
+   */
+  public abstract Resource getAvailableResources();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 3c17eca..98d99c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 
@@ -38,22 +39,20 @@ public interface ResourceUtilizationTracker {
   ResourceUtilization getCurrentUtilization();
 
   /**
-   * Add Container's resources to Node Utilization.
-   * @param container Container.
+   * Get the amount of resources currently available to launch containers.
+   * @return Resource resources available to launch containers
    */
-  void addContainerResources(Container container);
+  Resource getAvailableResources();
 
   /**
-   * Subtract Container's resources to Node Utilization.
+   * Add Container's resources to Node Utilization upon container launch.
    * @param container Container.
    */
-  void subtractContainerResource(Container container);
+  void containerLaunched(Container container);
 
   /**
-   * Check if NM has resources available currently to run the container.
+   * Subtract Container's resources to Node Utilization upon container release.
    * @param container Container.
-   * @return True, if NM has resources available currently to run the container.
    */
-  boolean hasResourcesAvailable(Container container);
-
+  void containerReleased(Container container);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
new file mode 100644
index 0000000..f486506
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+/**
+ * An implementation of NMAllocationPolicy based on the
+ * snapshot of the latest containers utilization to determine how much
+ * resources are available * to launch containers when over-allocation
+ * is turned on.
+ */
+public class SnapshotBasedOverAllocationPolicy
+    extends NMAllocationPolicy {
+
+  public SnapshotBasedOverAllocationPolicy(
+      ResourceThresholds overAllocationThresholds,
+      ContainersMonitor containersMonitor) {
+    super(overAllocationThresholds, containersMonitor);
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    ResourceUtilization utilization =
+        containersMonitor.getContainersUtilization(true).getUtilization();
+    long memoryAvailable = Math.round(
+        overAllocationThresholds.getMemoryThreshold() *
+            containersMonitor.getPmemAllocatedForContainers()) -
+        (utilization.getPhysicalMemory() << 20);
+    int vcoreAvailable = Math.round(
+        (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) *
+            containersMonitor.getVCoresAllocatedForContainers());
+    return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
new file mode 100644
index 0000000..6f9bc82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* An resource availability tracker that determines if there are resources
+* available based on if there are unallocated resources or if there are
+* un-utilized resources.
+*/
+public class UtilizationBasedResourceTracker
+    extends AllocationBasedResourceTracker {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
+
+  private final NMAllocationPolicy overAllocationPolicy;
+
+  UtilizationBasedResourceTracker(ContainerScheduler scheduler) {
+    super(scheduler);
+    this.overAllocationPolicy =
+        getContainersMonitor().getContainerOverAllocationPolicy();
+  }
+
+  @Override
+  public void containerLaunched(Container container) {
+    super.containerLaunched(container);
+    if (overAllocationPolicy != null) {
+      overAllocationPolicy.containerLaunched(container);
+    }
+  }
+
+  @Override
+  public void containerReleased(Container container) {
+    super.containerReleased(container);
+    if (overAllocationPolicy != null) {
+      overAllocationPolicy.containerReleased(container);
+    }
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    Resource resourceBasedOnAllocation = getUnallocatedResources();
+    Resource resourceBasedOnUtilization =
+        getResourcesAvailableBasedOnUtilization();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The amount of resources available based on allocation is " +
+          resourceBasedOnAllocation + ", based on utilization is " +
+          resourceBasedOnUtilization);
+    }
+
+    return Resources.componentwiseMax(resourceBasedOnAllocation,
+        resourceBasedOnUtilization);
+  }
+
+  /**
+   * Get the amount of resources based on the slack between
+   * the actual utilization and desired utilization.
+   * @return Resource resource available
+   */
+  private Resource getResourcesAvailableBasedOnUtilization() {
+    if (overAllocationPolicy == null) {
+      return Resources.none();
+    }
+
+    return overAllocationPolicy.getAvailableResources();
+  }
+
+  @Override
+  public ResourceUtilization getCurrentUtilization() {
+    return getContainersMonitor().getContainersUtilization(false)
+        .getUtilization();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 493aa4c..92613ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 
+import org.apache.hadoop.yarn.api.records.ContainerSubState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -345,6 +346,40 @@ public abstract class BaseContainerManagerTest {
           fStates.contains(containerStatus.getState()));
   }
 
+  public static void waitForContainerSubState(
+      ContainerManagementProtocol containerManager, ContainerId containerID,
+      ContainerSubState finalState)
+      throws InterruptedException, YarnException, IOException {
+    waitForContainerSubState(containerManager, containerID,
+        Arrays.asList(finalState), 20);
+  }
+  public static void waitForContainerSubState(
+      ContainerManagementProtocol containerManager, ContainerId containerID,
+      List<ContainerSubState> finalStates, int timeOutMax)
+      throws InterruptedException, YarnException, IOException {
+    List<ContainerId> list = new ArrayList<>();
+    list.add(containerID);
+    GetContainerStatusesRequest request =
+        GetContainerStatusesRequest.newInstance(list);
+    ContainerStatus containerStatus;
+    HashSet<ContainerSubState> fStates = new HashSet<>(finalStates);
+    int timeoutSecs = 0;
+    do {
+      Thread.sleep(1000);
+      containerStatus =
+          containerManager.getContainerStatuses(request)
+              .getContainerStatuses().get(0);
+      LOG.info("Waiting for container to get into one of states " + fStates
+          + ". Current state is " + containerStatus.getContainerSubState());
+      timeoutSecs += 1;
+    } while (!fStates.contains(containerStatus.getContainerSubState())
+        && timeoutSecs < timeOutMax);
+    LOG.info("Container state is " + containerStatus.getContainerSubState());
+    Assert.assertTrue("ContainerSubState is not correct (timedout)",
+        fStates.contains(containerStatus.getContainerSubState()));
+  }
+
+
   public static void waitForApplicationState(
       ContainerManagerImpl containerManager, ApplicationId appID,
       ApplicationState finalState)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 8aee532..c071283 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -288,7 +288,7 @@ public class TestContainersMonitorResourceChange {
     // will be 0.
     assertEquals(
         "Resource utilization must be default with MonitorThread's first run",
-        0, containersMonitor.getContainersUtilization()
+        0, containersMonitor.getContainersUtilization(false).getUtilization()
             .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
 
     // Verify the container utilization value. Since atleast one round is done,
@@ -303,8 +303,9 @@ public class TestContainersMonitorResourceChange {
       ContainersMonitorImpl containersMonitor, int timeoutMsecs)
       throws InterruptedException {
     int timeWaiting = 0;
-    while (0 == containersMonitor.getContainersUtilization()
-        .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) {
+    while (0 == containersMonitor.getContainersUtilization(false)
+        .getUtilization().compareTo(
+            ResourceUtilization.newInstance(0, 0, 0.0f))) {
       if (timeWaiting >= timeoutMsecs) {
         break;
       }
@@ -316,7 +317,7 @@ public class TestContainersMonitorResourceChange {
     }
 
     assertTrue("Resource utilization is not changed from second run onwards",
-        0 != containersMonitor.getContainersUtilization()
+        0 != containersMonitor.getContainersUtilization(false).getUtilization()
             .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
new file mode 100644
index 0000000..1e8bfdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link AllocationBasedResourceTracker} class.
+ */
+public class TestAllocationBasedResourceTracker {
+
+  private ContainerScheduler mockContainerScheduler;
+
+  @Before
+  public void setup() {
+    mockContainerScheduler = mock(ContainerScheduler.class);
+    ContainersMonitor containersMonitor =
+        new ContainersMonitorImpl(mock(ContainerExecutor.class),
+            mock(AsyncDispatcher.class), mock(Context.class));
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+    conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
+    conf.setInt(YarnConfiguration.NM_VCORES, 8);
+    containersMonitor.init(conf);
+    when(mockContainerScheduler.getContainersMonitor())
+        .thenReturn(containersMonitor);
+  }
+
+  /**
+   * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
+   * hasResourceAvailable should return false.
+   */
+  @Test
+  public void testHasResourcesAvailable() {
+    AllocationBasedResourceTracker tracker =
+        new AllocationBasedResourceTracker(mockContainerScheduler);
+    Container testContainer = mock(Container.class);
+    when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
+    for (int i = 0; i < 2; i++) {
+      Assert.assertTrue(
+          isResourcesAvailable(tracker.getAvailableResources(), testContainer));
+      tracker.containerLaunched(testContainer);
+    }
+    Assert.assertFalse(
+        isResourcesAvailable(tracker.getAvailableResources(), testContainer));
+  }
+
+  private static boolean isResourcesAvailable(
+      Resource available, Container container) {
+    return available.compareTo(container.getResource()) >= 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08116cc4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
deleted file mode 100644
index 82c2147..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the {@link AllocationBasedResourceUtilizationTracker} class.
- */
-public class TestAllocationBasedResourceUtilizationTracker {
-
-  private ContainerScheduler mockContainerScheduler;
-
-  @Before
-  public void setup() {
-    mockContainerScheduler = mock(ContainerScheduler.class);
-    ContainersMonitor containersMonitor =
-        new ContainersMonitorImpl(mock(ContainerExecutor.class),
-            mock(AsyncDispatcher.class), mock(Context.class));
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
-    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
-    conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
-    conf.setInt(YarnConfiguration.NM_VCORES, 8);
-    containersMonitor.init(conf);
-    when(mockContainerScheduler.getContainersMonitor())
-        .thenReturn(containersMonitor);
-  }
-
-  /**
-   * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
-   * hasResourceAvailable should return false.
-   */
-  @Test
-  public void testHasResourcesAvailable() {
-    AllocationBasedResourceUtilizationTracker tracker =
-        new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
-    Container testContainer = mock(Container.class);
-    when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
-    for (int i = 0; i < 2; i++) {
-      Assert.assertTrue(tracker.hasResourcesAvailable(testContainer));
-      tracker.addContainerResources(testContainer);
-    }
-    Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
-  }
-
-  /**
-   * Test the case where the current allocation has been truncated to 0.8888891
-   * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return
-   * true.
-   */
-  @Test
-  public void testHasEnoughCpu() {
-    AllocationBasedResourceUtilizationTracker tracker =
-        new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
-    float currentAllocation = 0.8888891f;
-    long totalCores = 9;
-    int alreadyUsedCores = 8;
-    Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores,
-        (int) totalCores - alreadyUsedCores));
-    Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores,
-        (int) totalCores - alreadyUsedCores + 1));
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org