You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/08 20:50:23 UTC

[1/2] hadoop git commit: Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen."

Repository: hadoop
Updated Branches:
  refs/heads/YARN-1011 c1362b68a -> f9a7055b9


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
index 2cef53a..2ae8b97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -64,8 +63,8 @@ public class TestContainerSchedulerRecovery {
 
   @Mock private ContainerId containerId;
 
-  @Mock private AllocationBasedResourceTracker
-      allocationBasedResourceTracker;
+  @Mock private AllocationBasedResourceUtilizationTracker
+      allocationBasedResourceUtilizationTracker;
 
   @InjectMocks private ContainerScheduler tempContainerScheduler =
       new ContainerScheduler(context, dispatcher, metrics, 0);
@@ -76,13 +75,12 @@ public class TestContainerSchedulerRecovery {
     MockitoAnnotations.initMocks(this);
     spy = spy(tempContainerScheduler);
     when(container.getContainerId()).thenReturn(containerId);
-    when(container.getResource()).thenReturn(Resource.newInstance(1024, 1));
     when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
     when(containerId.getApplicationAttemptId().getApplicationId())
         .thenReturn(appId);
     when(containerId.getContainerId()).thenReturn(123L);
-    doNothing().when(allocationBasedResourceTracker)
-        .containerLaunched(container);
+    doNothing().when(allocationBasedResourceUtilizationTracker)
+        .addContainerResources(container);
   }
 
   @After public void tearDown() {
@@ -103,8 +101,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(1, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as QUEUED, OPPORTUNISTIC,
@@ -122,8 +120,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(1, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as PAUSED, GUARANTEED,
@@ -141,8 +139,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(1, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as PAUSED, OPPORTUNISTIC,
@@ -160,8 +158,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(1, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as LAUNCHED, GUARANTEED,
@@ -179,8 +177,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(1, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(1))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC,
@@ -198,8 +196,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(1, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(1))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as REQUESTED, GUARANTEED,
@@ -217,8 +215,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as REQUESTED, OPPORTUNISTIC,
@@ -236,8 +234,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as COMPLETED, GUARANTEED,
@@ -255,8 +253,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as COMPLETED, OPPORTUNISTIC,
@@ -274,8 +272,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as GUARANTEED but no executionType set,
@@ -292,8 +290,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 
   /*Test if a container is recovered as PAUSED but no executionType set,
@@ -310,7 +308,7 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    Mockito.verify(allocationBasedResourceTracker, Mockito.times(0))
-        .containerLaunched(container);
+    Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+        .addContainerResources(container);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
deleted file mode 100644
index 384b116..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
+++ /dev/null
@@ -1,1121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ContainerSubState;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.exceptions.ConfigurationException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
-import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Test ContainerScheduler behaviors when NM overallocation is turned on.
- */
-public class TestContainerSchedulerWithOverAllocation
-    extends BaseContainerManagerTest {
-  private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3;
-  private static final int NM_CONTAINERS_VCORES = 4;
-  private static final int NM_CONTAINERS_MEMORY_MB = 2048;
-
-  static {
-    LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
-  }
-
-  public TestContainerSchedulerWithOverAllocation()
-      throws UnsupportedFileSystemException {
-  }
-
-  @Override
-  protected ContainerExecutor createContainerExecutor() {
-    DefaultContainerExecutor exec =
-        new LongRunningContainerSimulatingContainerExecutor();
-    exec.setConf(conf);
-    return exec;
-  }
-
-  @Override
-  protected ContainerManagerImpl createContainerManager(
-      DeletionService delSrvc) {
-    return new LongRunningContainerSimulatingContainersManager(
-        context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler, user);
-  }
-
-  @Override
-  public void setup() throws IOException {
-    conf.setInt(
-        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
-        NM_OPPORTUNISTIC_QUEUE_LIMIT);
-    conf.setFloat(
-        YarnConfiguration.NM_OVERALLOCATION_CPU_UTILIZATION_THRESHOLD,
-        0.75f);
-    conf.setFloat(
-        YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD,
-        0.75f);
-    super.setup();
-  }
-
-  /**
-   * Start one GUARANTEED and one OPPORTUNISTIC container, which in aggregate do
-   * not exceed the capacity of the node. Both containers are expected to start
-   * running immediately.
-   */
-  @Test
-  public void testStartMultipleContainersWithoutOverallocation()
-      throws Exception {
-    containerManager.start();
-
-    StartContainersRequest allRequests = StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() { {
-          add(createStartContainerRequest(0,
-              BuilderUtils.newResource(1024, 1), false));
-          add(createStartContainerRequest(1,
-              BuilderUtils.newResource(1024, 1), true));
-        } }
-    );
-    containerManager.startContainers(allRequests);
-
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(1), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-  /**
-   * Start one GUARANTEED and one OPPORTUNISTIC containers whose utilization
-   * is very low relative to their resource request, resulting in a low node
-   * utilization. Then start another OPPORTUNISTIC containers which requests
-   * more than what's left unallocated on the node. Due to overallocation
-   * being turned on and node utilization being low, the second OPPORTUNISTIC
-   * container is also expected to be launched immediately.
-   */
-  @Test
-  public void testStartOppContainersWithPartialOverallocationLowUtilization()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), true));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(1), ContainerSubState.RUNNING);
-
-    // the current containers utilization is low
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(512, 0, 1.0f/8));
-
-    // start a container that requests more than what's left unallocated
-    // 512 + 1024 + 824 > 2048
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false))
-    ));
-
-    // this container is expected to be started immediately because there
-    // are (memory: 1024, vcore: 0.625) available based on over-allocation
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(2), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-  /**
-   * Start one GUARANTEED and one OPPORTUNISTIC containers which utilizes most
-   * of the resources they requested, resulting in a high node utilization.
-   * Then start another OPPORTUNISTIC containers which requests more than what's
-   * left unallocated on the node. Because of the high resource utilization on
-   * the node, the projected utilization, if we were to start the second
-   * OPPORTUNISTIC container immediately, will go over the NM overallocation
-   * threshold, so the second OPPORTUNISTIC container is expected to be queued.
-   */
-  @Test
-  public void testQueueOppContainerWithPartialOverallocationHighUtilization()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), true));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(
-        containerManager, createContainerId(1), ContainerSubState.RUNNING);
-
-    // the containers utilization is high
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1500, 0, 1.0f/8));
-
-    // start a container that requests more than what's left unallocated
-    // 512 + 1024 + 824 > 2048
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false))
-    ));
-    // this container will not start immediately because there is not
-    // enough resource available at the moment either in terms of
-    // resources unallocated or in terms of the actual utilization
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.SCHEDULED);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.SCHEDULED);
-      }
-    });
-  }
-
-  /**
-   * Start two GUARANTEED containers which in aggregate takes up the whole node
-   * capacity, yet whose utilization is low relative to their resource request,
-   * resulting in a low node resource utilization. Then try to start another
-   * OPPORTUNISTIC containers. Because the resource utilization across the node
-   * is low and overallocation being turned on, the OPPORTUNISTIC container is
-   * expected to be launched immediately even though there is no resources left
-   * unallocated.
-   */
-  @Test
-  public void testStartOppContainersWithOverallocationLowUtilization()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), true));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-
-    // the current containers utilization is low
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(800, 0, 1.0f/8));
-
-    // start a container when there is no resources left unallocated.
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false))
-    ));
-
-    // this container is expected to be started because there is resources
-    // available because the actual utilization is very low
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-
-  /**
-   * Start two GUARANTEED containers which in aggregate take up the whole node
-   * capacity and fully utilize the resources they requested. Then try to start
-   * four OPPORTUNISTIC containers of which three will be queued and one will be
-   * killed because of the max queue length is 3.
-   */
-  @Test
-  public void testQueueOppContainersWithFullUtilization() throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), true));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-
-    // the containers are fully utilizing their resources
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(2048, 0, 1.0f/8));
-
-    // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container
-    // queue can hold when there is no unallocated resource left.
-    List<StartContainerRequest> moreContainerRequests =
-        new ArrayList<>(NM_OPPORTUNISTIC_QUEUE_LIMIT + 1);
-    for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) {
-      moreContainerRequests.add(
-          createStartContainerRequest(2 + a,
-              BuilderUtils.newResource(512, 1), false));
-    }
-    containerManager.startContainers(
-        StartContainersRequest.newInstance(moreContainerRequests));
-
-    // All OPPORTUNISTIC containers but the last one should be queued.
-    // The last OPPORTUNISTIC container to launch should be killed.
-    BaseContainerManagerTest.waitForContainerState(
-        containerManager, createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT + 2),
-        ContainerState.COMPLETE);
-
-    HashMap<ContainerId, ContainerSubState> expectedContainerStatus =
-        new HashMap<>();
-    expectedContainerStatus.put(
-        createContainerId(0), ContainerSubState.RUNNING);
-    expectedContainerStatus.put(
-        createContainerId(1), ContainerSubState.RUNNING);
-    expectedContainerStatus.put(
-        createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT),
-        ContainerSubState.DONE);
-    for (int i = 0; i < NM_OPPORTUNISTIC_QUEUE_LIMIT; i++) {
-      expectedContainerStatus.put(
-          createContainerId(i + 2), ContainerSubState.SCHEDULED);
-    }
-    verifyContainerStatuses(expectedContainerStatus);
-  }
-
-  /**
-   * Start two GUARANTEED containers that together does not take up the
-   * whole node. Then try to start one OPPORTUNISTIC container that will
-   * fit into the remaining unallocated space on the node.
-   * The OPPORTUNISTIC container is expected to start even though the
-   * current node utilization is above the NM overallocation threshold,
-   * because it's always safe to launch containers as long as the node
-   * has not been fully allocated.
-   */
-  @Test
-  public void testStartOppContainerWithHighUtilizationNoOverallocation()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1200, 1), true));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(400, 1), true));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-
-    //  containers utilization is above the over-allocation threshold
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1600, 0, 1.0f/2));
-
-    // start a container that can just fit in the remaining unallocated space
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(2,
-                BuilderUtils.newResource(400, 1), false))
-    ));
-
-    // the OPPORTUNISTIC container can be safely launched even though
-    // the container utilization is above the NM overallocation threshold
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-  /**
-   * Start two OPPORTUNISTIC containers first whose utilization is low relative
-   * to the resources they requested, resulting in a low node utilization. Then
-   * try to start a GUARANTEED container which requests more than what's left
-   * unallocated on the node. Because the node utilization is low and NM
-   * overallocation is turned on, the GUARANTEED container is expected to be
-   * started immediately without killing any running OPPORTUNISTIC containers.
-   */
-  @Test
-  public void testKillNoOppContainersWithPartialOverallocationLowUtilization()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), false));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-
-    // containers utilization is low
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(512, 0, 1.0f/8));
-
-    // start a GUARANTEED container that requests more than what's left
-    // unallocated on the node: (512  + 1024 + 824) > 2048
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), true))
-    ));
-
-    // the GUARANTEED container is expected be launched immediately without
-    // killing any OPPORTUNISTIC containers.
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-  /**
-   * Start two OPPORTUNISTIC containers whose utilization will be high relative
-   * to the resources they requested, resulting in a high node utilization.
-   * Then try to start a GUARANTEED container which requests more than what's
-   * left unallocated on the node. Because the node is under high utilization,
-   * the second OPPORTUNISTIC container is expected to be killed in order to
-   * make room for the GUARANTEED container.
-   */
-  @Test
-  public void testKillOppContainersWithPartialOverallocationHighUtilization()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), false));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-
-    // the containers utilization is very high
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1800, 0, 1.0f/8));
-
-    // start a GUARANTEED container that requests more than what's left
-    // unallocated on the node 512 + 1024 + 824 > 2048
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), true))
-    ));
-
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-    // the last launched OPPORTUNISTIC container is expected to be killed
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.DONE);
-
-    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
-        newInstance(new ArrayList<ContainerId>() {
-          {
-            add(createContainerId(0));
-            add(createContainerId(1));
-            add(createContainerId(2));
-          }
-        });
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getContainerId().equals(createContainerId(1))) {
-        Assert.assertTrue(status.getDiagnostics().contains(
-            "Container Killed to make room for Guaranteed Container"));
-      } else {
-        Assert.assertEquals(status.getContainerId() + " is not RUNNING",
-            ContainerSubState.RUNNING, status.getContainerSubState());
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-  }
-
-
-  /**
-   * Start three OPPORTUNISTIC containers which in aggregates exceeds the
-   * capacity of the node, yet whose utilization is low relative
-   * to the resources they requested, resulting in a low node utilization.
-   * Then try to start a GUARANTEED container. Even though the node has
-   * nothing left unallocated, it is expected to start immediately
-   * without killing any running OPPORTUNISTIC containers because the node
-   * utilization is very low and overallocation is turned on.
-   */
-  @Test
-  public void testKillNoOppContainersWithOverallocationLowUtilization()
-      throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(2,
-                BuilderUtils.newResource(1024, 1), false));
-          }
-        }
-    ));
-    // All three GUARANTEED containers are all expected to start
-    // because the containers utilization is low (0 at the point)
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-
-    // the containers utilization is low
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1024, 0, 1.0f/8));
-
-    // start a GUARANTEED container that requests more than what's left
-    // unallocated on the node: (512  + 1024 + 824) > 2048
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(3,
-                BuilderUtils.newResource(512, 1), true))
-    ));
-
-    // the GUARANTEED container is expected be launched immediately without
-    // killing any OPPORTUNISTIC containers
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(3), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.RUNNING);
-        put(createContainerId(3), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-  /**
-   * Start four OPPORTUNISTIC containers which in aggregates exceeds the
-   * capacity of the node. The real resource utilization of the first two
-   * OPPORTUNISTIC containers are high whereas that of the latter two are
-   * almost zero. Then try to start a GUARANTEED container. The GUARANTEED
-   * container will eventually start running after preempting the third
-   * and fourth OPPORTUNISTIC container (which releases no resources) and
-   * then the second OPPORTUNISTIC container.
-   */
-  @Test
-  public void
-      testKillOppContainersConservativelyWithOverallocationHighUtilization()
-          throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false));
-            add(createStartContainerRequest(3,
-                BuilderUtils.newResource(1024, 1), false));
-          }
-        }
-    ));
-    // All four GUARANTEED containers are all expected to start
-    // because the containers utilization is low (0 at the point)
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(3), ContainerSubState.RUNNING);
-
-    // the containers utilization is at the overallocation threshold
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
-    // try to start a GUARANTEED container when there's nothing left unallocated
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(4,
-                BuilderUtils.newResource(1024, 1), true))
-    ));
-
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(4), ContainerSubState.RUNNING);
-    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
-        newInstance(new ArrayList<ContainerId>() {
-          {
-            add(createContainerId(0));
-            add(createContainerId(1));
-            add(createContainerId(2));
-            add(createContainerId(3));
-            add(createContainerId(4));
-          }
-        });
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getContainerId().equals(createContainerId(0)) ||
-          status.getContainerId().equals(createContainerId(4))) {
-        Assert.assertEquals(
-            ContainerSubState.RUNNING, status.getContainerSubState());
-      } else {
-        Assert.assertTrue(status.getDiagnostics().contains(
-            "Container Killed to make room for Guaranteed Container"));
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-  }
-
-  /**
-   * Start two OPPORTUNISTIC containers followed by one GUARANTEED container,
-   * which in aggregate exceeds the capacity of the node. The first two
-   * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED
-   * one utilizes nearly all of its resource requested. Then try to start two
-   * more OPPORTUNISTIC containers. The two OPPORTUNISTIC containers are
-   * expected to be queued immediately. Upon the completion of the
-   * resource-usage-heavy GUARANTEED container, both OPPORTUNISTIC containers
-   * are expected to start.
-   */
-  @Test
-  public void testStartOppContainersUponContainerCompletion() throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(512, 1), false));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(512, 1), false));
-            add(createStartContainerRequest(2,
-                BuilderUtils.newResource(1024, 1), true));
-          }
-        }
-    ));
-
-    // All three containers are all expected to start immediately
-    // because the node utilization is low (0 at the point)
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-
-    // the contianers utilization is at the overallocation threshold
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(3,
-                BuilderUtils.newResource(512, 1), false));
-            add(createStartContainerRequest(4,
-                BuilderUtils.newResource(512, 1), false));
-          }
-        }
-    ));
-    // the two new OPPORTUNISTIC containers are expected to be queued
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(3), ContainerSubState.SCHEDULED);
-        put(createContainerId(4), ContainerSubState.SCHEDULED);
-      }
-    });
-
-    // the GUARANTEED container is completed releasing resources
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(100, 0, 1.0f/5));
-    allowContainerToSucceed(2);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.DONE);
-
-    // the two OPPORTUNISTIC containers are expected to start together
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(3), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(4), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.DONE);
-        put(createContainerId(3), ContainerSubState.RUNNING);
-        put(createContainerId(4), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-  /**
-   * Start one GUARANTEED container that consumes all the resources on the
-   * node and keeps running, followed by two OPPORTUNISTIC containers that
-   * will be queued forever because there is no containers starting or
-   * finishing. Then try to start OPPORTUNISTIC containers out of band.
-   */
-  @Test
-  public void testStartOpportunisticContainersOutOfBand() throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(0,
-                BuilderUtils.newResource(2048, 4), true))));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-
-    // the container is fully utilizing its resources
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(2048, 0, 1.0f));
-
-    // send two OPPORTUNISTIC container requests that are expected to be queued
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(512, 1), false));
-            add(createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false));
-          }
-        }
-    ));
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.SCHEDULED);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.SCHEDULED);
-
-    // the containers utilization dropped to the overallocation threshold
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
-    // try to start containers out of band.
-    ((LongRunningContainerSimulatingContainersManager)containerManager)
-        .startContainersOutOfBandUponLowUtilization();
-
-    // no containers in queue are expected to be launched because the
-    // containers utilization is not below the over-allocation threshold
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.SCHEDULED);
-        put(createContainerId(2), ContainerSubState.SCHEDULED);
-      }
-    });
-
-    // the GUARANTEED container is completed releasing resources
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(100, 0, 1.0f/5));
-
-    // the containers utilization dropped way below the overallocation threshold
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(512, 0, 1.0f/8));
-
-    ((LongRunningContainerSimulatingContainersManager)containerManager)
-        .startContainersOutOfBandUponLowUtilization();
-
-    // the two OPPORTUNISTIC containers are expected to be launched
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-
-    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
-      {
-        put(createContainerId(0), ContainerSubState.RUNNING);
-        put(createContainerId(1), ContainerSubState.RUNNING);
-        put(createContainerId(2), ContainerSubState.RUNNING);
-      }
-    });
-  }
-
-
-  private void setContainerResourceUtilization(ResourceUtilization usage) {
-    ((ContainerMonitorForOverallocationTest)
-        containerManager.getContainersMonitor())
-            .setContainerResourceUsage(usage);
-  }
-
-  private void allowContainerToSucceed(int containerId) {
-    ((LongRunningContainerSimulatingContainerExecutor) this.exec)
-        .containerSucceeded(createContainerId(containerId));
-  }
-
-
-  protected StartContainerRequest createStartContainerRequest(int containerId,
-      Resource resource, boolean isGuaranteed) throws IOException {
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    ExecutionType executionType = isGuaranteed ? ExecutionType.GUARANTEED :
-        ExecutionType.OPPORTUNISTIC;
-    Token containerToken = createContainerToken(
-        createContainerId(containerId),
-        DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource,
-        context.getContainerTokenSecretManager(),
-        null, executionType);
-
-    return StartContainerRequest.newInstance(
-        containerLaunchContext, containerToken);
-  }
-
-  protected void verifyContainerStatuses(
-      Map<ContainerId, ContainerSubState> expected)
-      throws IOException, YarnException {
-    List<ContainerId> statList = new ArrayList<>(expected.keySet());
-    GetContainerStatusesRequest statRequest =
-        GetContainerStatusesRequest.newInstance(statList);
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-
-    for (ContainerStatus status : containerStatuses) {
-      ContainerId containerId = status.getContainerId();
-      Assert.assertEquals(containerId + " is in unexpected state",
-          expected.get(containerId), status.getContainerSubState());
-    }
-  }
-
-  /**
-   * A container manager that sends a dummy container pid while it's cleaning
-   * up running containers. Used along with
-   * LongRunningContainerSimulatingContainerExecutor to simulate long running
-   * container processes for testing purposes.
-   */
-  private static class LongRunningContainerSimulatingContainersManager
-      extends ContainerManagerImpl {
-
-    private final String user;
-
-    LongRunningContainerSimulatingContainersManager(
-        Context context, ContainerExecutor exec,
-        DeletionService deletionContext,
-        NodeStatusUpdater nodeStatusUpdater,
-        NodeManagerMetrics metrics,
-        LocalDirsHandlerService dirsHandler, String user) {
-      super(context, exec, deletionContext,
-          nodeStatusUpdater, metrics, dirsHandler);
-      this.user = user;
-    }
-
-    @Override
-    protected UserGroupInformation getRemoteUgi() throws YarnException {
-      ApplicationId appId = ApplicationId.newInstance(0, 0);
-      ApplicationAttemptId appAttemptId =
-          ApplicationAttemptId.newInstance(appId, 1);
-      UserGroupInformation ugi =
-          UserGroupInformation.createRemoteUser(appAttemptId.toString());
-      ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
-          .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
-          .getKeyId()));
-      return ugi;
-    }
-
-    /**
-     * Create a container launcher that signals container processes
-     * with a dummy pid. The container processes are simulated in
-     * LongRunningContainerSimulatingContainerExecutor which does
-     * not write a pid file on behalf of containers to launch, so
-     * the pid does not matter.
-     */
-    @Override
-    protected ContainersLauncher createContainersLauncher(
-        Context context, ContainerExecutor exec) {
-      ContainerManagerImpl containerManager = this;
-      return new ContainersLauncher(context, dispatcher, exec, dirsHandler,
-          this) {
-        @Override
-        protected ContainerLaunch createContainerLaunch(
-            Application app, Container container) {
-          return new ContainerLaunch(context, getConfig(), dispatcher,
-              exec, app, container, dirsHandler, containerManager) {
-            @Override
-            protected String getContainerPid(Path pidFilePath)
-                throws Exception {
-              return "123";
-            }
-
-          };
-        }
-      };
-    }
-
-    @Override
-    protected ContainersMonitor createContainersMonitor(
-        ContainerExecutor exec) {
-      return new ContainerMonitorForOverallocationTest(exec,
-          dispatcher, context);
-    }
-
-    public void startContainersOutOfBandUponLowUtilization() {
-      ((ContainerMonitorForOverallocationTest) getContainersMonitor())
-          .attemptToStartContainersUponLowUtilization();
-    }
-  }
-
-  /**
-   * A container executor that simulates long running container processes
-   * by having container launch threads sleep infinitely until it's given
-   * a signal to finish with either a success or failure exit code.
-   */
-  private static class LongRunningContainerSimulatingContainerExecutor
-      extends DefaultContainerExecutor {
-    private ConcurrentHashMap<ContainerId, ContainerFinishLatch> containers =
-        new ConcurrentHashMap<>();
-
-    public void containerSucceeded(ContainerId containerId) {
-      ContainerFinishLatch containerFinishLatch = containers.get(containerId);
-      if (containerFinishLatch != null) {
-        containerFinishLatch.toSucceed();
-      }
-    }
-
-    public void containerFailed(ContainerId containerId) {
-      ContainerFinishLatch containerFinishLatch = containers.get(containerId);
-      if (containerFinishLatch != null) {
-        containerFinishLatch.toFail();
-      }
-    }
-
-    /**
-     * Simulate long running container processes by having container launcher
-     * threads wait infinitely for a signal to finish.
-     */
-    @Override
-    public int launchContainer(ContainerStartContext ctx)
-        throws IOException, ConfigurationException {
-      ContainerId container = ctx.getContainer().getContainerId();
-      containers.putIfAbsent(container, new ContainerFinishLatch(container));
-
-      // simulate a long running container process by having the
-      // container launch thread sleep forever until it's given a
-      // signal to finish with a exit code.
-      while (!containers.get(container).toProceed) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          return -1;
-        }
-      }
-
-      return containers.get(container).getContainerExitCode();
-    }
-
-    /**
-     * Override signalContainer() so that simulated container processes
-     * are properly cleaned up.
-     */
-    @Override
-    public boolean signalContainer(ContainerSignalContext ctx)
-        throws IOException {
-      containerSucceeded(ctx.getContainer().getContainerId());
-      return true;
-    }
-
-    /**
-     * A signal that container launch threads wait for before exiting
-     * in order to simulate long running container processes.
-     */
-    private static final class ContainerFinishLatch {
-      volatile boolean toProceed;
-      int exitCode;
-      ContainerId container;
-
-      ContainerFinishLatch(ContainerId containerId) {
-        exitCode = 0;
-        toProceed = false;
-        container = containerId;
-      }
-
-      void toSucceed() {
-        exitCode = 0;
-        toProceed = true;
-      }
-
-      void toFail() {
-        exitCode = -101;
-        toProceed = true;
-      }
-
-      int getContainerExitCode() {
-        // read barrier of toProceed to make sure the exit code is not stale
-        if (toProceed) {
-          LOG.debug(container + " finished with exit code: " + exitCode);
-        }
-        return exitCode;
-      }
-    }
-  }
-
-  /**
-   * A test implementation of container monitor that allows control of
-   * current resource utilization.
-   */
-  private static class ContainerMonitorForOverallocationTest
-      extends ContainersMonitorImpl {
-
-    private ResourceUtilization containerResourceUsage =
-        ResourceUtilization.newInstance(0, 0, 0.0f);
-
-    ContainerMonitorForOverallocationTest(ContainerExecutor exec,
-        AsyncDispatcher dispatcher, Context context) {
-      super(exec, dispatcher, context);
-    }
-
-    @Override
-    public long getPmemAllocatedForContainers() {
-      return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L;
-    }
-
-    @Override
-    public long getVmemAllocatedForContainers() {
-      float pmemRatio = getConfig().getFloat(
-          YarnConfiguration.NM_VMEM_PMEM_RATIO,
-          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-      return (long) (pmemRatio * getPmemAllocatedForContainers());
-    }
-
-    @Override
-    public long getVCoresAllocatedForContainers() {
-      return NM_CONTAINERS_VCORES;
-    }
-
-    @Override
-    public ContainersResourceUtilization getContainersUtilization(
-        boolean latest) {
-      return new ContainersMonitor.ContainersResourceUtilization(
-          containerResourceUsage, System.currentTimeMillis());
-    }
-
-    public void setContainerResourceUsage(
-        ResourceUtilization containerResourceUsage) {
-      this.containerResourceUsage = containerResourceUsage;
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen."

Posted by ha...@apache.org.
Revert "YARN-6675. Add NM support to launch opportunistic containers based on overallocation. Contributed by Haibo Chen."

This reverts commit c1362b68af03c546f4c0802758e9729d2372ab6c.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f9a7055b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f9a7055b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f9a7055b

Branch: refs/heads/YARN-1011
Commit: f9a7055b94f4c1d4187af6a189e20f51b6396fd6
Parents: c1362b6
Author: Haibo Chen <ha...@apache.org>
Authored: Tue May 8 13:50:06 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Tue May 8 13:50:06 2018 -0700

----------------------------------------------------------------------
 .../nodemanager/NodeStatusUpdaterImpl.java      |    2 +-
 .../containermanager/ContainerManagerImpl.java  |    8 +-
 .../launcher/ContainerLaunch.java               |    2 +-
 .../launcher/ContainersLauncher.java            |    9 +-
 .../monitor/ContainersMonitor.java              |   38 +-
 .../monitor/ContainersMonitorImpl.java          |   56 +-
 .../AllocationBasedResourceTracker.java         |  114 --
 ...locationBasedResourceUtilizationTracker.java |  158 +++
 .../scheduler/ContainerScheduler.java           |  318 ++---
 .../scheduler/ContainerSchedulerEventType.java  |    4 +-
 .../scheduler/NMAllocationPolicy.java           |   63 -
 .../scheduler/ResourceUtilizationTracker.java   |   17 +-
 .../SnapshotBasedOverAllocationPolicy.java      |   54 -
 .../UtilizationBasedResourceTracker.java        |   95 --
 .../BaseContainerManagerTest.java               |   35 -
 .../TestContainersMonitorResourceChange.java    |    9 +-
 .../TestAllocationBasedResourceTracker.java     |   82 --
 ...locationBasedResourceUtilizationTracker.java |   93 ++
 .../TestContainerSchedulerRecovery.java         |   58 +-
 ...estContainerSchedulerWithOverAllocation.java | 1121 ------------------
 20 files changed, 420 insertions(+), 1916 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index a2f70d5..44f9740 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -532,7 +532,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private ResourceUtilization getContainersUtilization() {
     ContainersMonitor containersMonitor =
         this.context.getContainerManager().getContainersMonitor();
-    return containersMonitor.getContainersUtilization(false).getUtilization();
+    return containersMonitor.getContainersUtilization();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 77ea4fa..3470910 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -243,12 +243,6 @@ public class ContainerManagerImpl extends CompositeService implements
             metrics);
     addService(rsrcLocalizationSrvc);
 
-    this.containersMonitor = createContainersMonitor(exec);
-    addService(this.containersMonitor);
-
-    // ContainersLauncher must be added after ContainersMonitor
-    // because the former depends on the latter to initialize
-    // over-allocation first.
     containersLauncher = createContainersLauncher(context, exec);
     addService(containersLauncher);
 
@@ -273,6 +267,8 @@ public class ContainerManagerImpl extends CompositeService implements
       nmMetricsPublisher = createNMTimelinePublisher(context);
       context.setNMTimelinePublisher(nmMetricsPublisher);
     }
+    this.containersMonitor = createContainersMonitor(exec);
+    addService(this.containersMonitor);
 
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 5fb9b51..3875cbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1074,7 +1074,7 @@ public class ContainerLaunch implements Callable<Integer> {
    * @return Process ID
    * @throws Exception
    */
-  protected String getContainerPid(Path pidFilePath) throws Exception {
+  private String getContainerPid(Path pidFilePath) throws Exception {
     String containerIdStr = 
         container.getContainerId().toString();
     String processId = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index c3d0a4d..cfd5d6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -116,7 +116,8 @@ public class ContainersLauncher extends AbstractService
               containerId.getApplicationAttemptId().getApplicationId());
 
         ContainerLaunch launch =
-            createContainerLaunch(app, event.getContainer());
+            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
+              event.getContainer(), dirsHandler, containerManager);
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
@@ -212,10 +213,4 @@ public class ContainersLauncher extends AbstractService
         break;
     }
   }
-
-  protected ContainerLaunch createContainerLaunch(
-      Application app, Container container) {
-    return new ContainerLaunch(context, getConfig(), dispatcher,
-        exec, app, container, dirsHandler, containerManager);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 8da4ec4..64831e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -23,24 +23,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
 
 public interface ContainersMonitor extends Service,
     EventHandler<ContainersMonitorEvent>, ResourceView {
-
-  /**
-   * Get the aggregate resource utilization of containers running on the node,
-   * with a timestamp of the measurement.
-   * @param latest true if the latest result should be returned
-   * @return ResourceUtilization resource utilization of all containers
-   */
-  ContainersResourceUtilization getContainersUtilization(boolean latest);
-
-  /**
-   * Get the policy to over-allocate containers when over-allocation is on.
-   * @return null if over-allocation is turned off
-   */
-  NMAllocationPolicy getContainerOverAllocationPolicy();
+  ResourceUtilization getContainersUtilization();
 
   float getVmemRatio();
 
@@ -80,26 +66,4 @@ public interface ContainersMonitor extends Service,
         * containersMonitor.getVmemRatio());
     resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
   }
-
-  /**
-   * A snapshot of resource utilization of all containers with the timestamp.
-   */
-  final class ContainersResourceUtilization {
-    private final ResourceUtilization utilization;
-    private final long timestamp;
-
-    public ContainersResourceUtilization(
-        ResourceUtilization utilization, long timestamp) {
-      this.utilization = utilization;
-      this.timestamp = timestamp;
-    }
-
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    public ResourceUtilization getUtilization() {
-      return utilization;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index c3312f8..acc256f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -23,10 +23,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,9 +106,8 @@ public class ContainersMonitorImpl extends AbstractService implements
     CPU, MEMORY
   }
 
-  private ContainersResourceUtilization latestContainersUtilization;
+  private ResourceUtilization containersUtilization;
 
-  private NMAllocationPolicy overAllocationPolicy;
   private ResourceThresholds overAllocationPreemptionThresholds;
   private int overAlloctionPreemptionCpuCount = -1;
 
@@ -128,8 +123,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     this.monitoringThread = new MonitoringThread();
 
-    this.latestContainersUtilization = new ContainersResourceUtilization(
-        ResourceUtilization.newInstance(-1, -1, -1.0f), -1L);
+    this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
   }
 
   @Override
@@ -336,10 +330,6 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance(
         cpuPreemptionThreshold, memoryPreemptionThreshold);
 
-    // TODO make this configurable
-    this.overAllocationPolicy =
-        createOverAllocationPolicy(resourceThresholds);
-
     LOG.info("NodeManager oversubscription enabled with overallocation " +
         "thresholds (memory:" + overAllocationMemoryUtilizationThreshold +
         ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" +
@@ -347,11 +337,6 @@ public class ContainersMonitorImpl extends AbstractService implements
         cpuPreemptionThreshold + ")");
   }
 
-  protected NMAllocationPolicy createOverAllocationPolicy(
-      ResourceThresholds resourceThresholds) {
-    return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this);
-  }
-
   private boolean isResourceCalculatorAvailable() {
     if (resourceCalculatorPlugin == null) {
       LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
@@ -624,12 +609,7 @@ public class ContainersMonitorImpl extends AbstractService implements
         }
 
         // Save the aggregated utilization of the containers
-        setLatestContainersUtilization(trackedContainersUtilization);
-
-        // check opportunity to start containers if over-allocation is on
-        if (context.isOverAllocationEnabled()) {
-          attemptToStartContainersUponLowUtilization();
-        }
+        setContainersUtilization(trackedContainersUtilization);
 
         // Publish the container utilization metrics to node manager
         // metrics system.
@@ -995,34 +975,12 @@ public class ContainersMonitorImpl extends AbstractService implements
   }
 
   @Override
-  public ContainersResourceUtilization getContainersUtilization(
-      boolean latest) {
-    // TODO update containerUtilization if latest is true
-    return this.latestContainersUtilization;
-  }
-
-  @Override
-  public NMAllocationPolicy getContainerOverAllocationPolicy() {
-    return overAllocationPolicy;
-  }
-
-  private void setLatestContainersUtilization(ResourceUtilization utilization) {
-    this.latestContainersUtilization = new ContainersResourceUtilization(
-        utilization, System.currentTimeMillis());
+  public ResourceUtilization getContainersUtilization() {
+    return this.containersUtilization;
   }
 
-  @VisibleForTesting
-  public void attemptToStartContainersUponLowUtilization() {
-    if (getContainerOverAllocationPolicy() != null) {
-      Resource available = getContainerOverAllocationPolicy()
-          .getAvailableResources();
-      if (available.getMemorySize() > 0 &&
-          available.getVirtualCores() > 0) {
-        eventDispatcher.getEventHandler().handle(
-            new ContainerSchedulerEvent(null,
-                ContainerSchedulerEventType.SCHEDULE_CONTAINERS));
-      }
-    }
+  private void setContainersUtilization(ResourceUtilization utilization) {
+    this.containersUtilization = utilization;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
deleted file mode 100644
index 86b3698..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the resource utilization tracker that equates
- * resource utilization with the total resource allocated to the container.
- */
-public class AllocationBasedResourceTracker
-    implements ResourceUtilizationTracker {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
-
-  private static final Resource UNAVAILABLE =
-      Resource.newInstance(0, 0);
-
-  private ResourceUtilization containersAllocation;
-  private ContainerScheduler scheduler;
-
-
-  AllocationBasedResourceTracker(ContainerScheduler scheduler) {
-    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
-    this.scheduler = scheduler;
-  }
-
-  /**
-   * Get the accumulation of totally allocated resources to containers.
-   * @return ResourceUtilization Resource Utilization.
-   */
-  @Override
-  public ResourceUtilization getCurrentUtilization() {
-    return this.containersAllocation;
-  }
-
-  /**
-   * Get the amount of resources that have not been allocated to containers.
-   * @return Resource resources that have not been allocated to containers.
-   */
-  protected Resource getUnallocatedResources() {
-    // unallocated resources = node capacity - containers allocation
-    // = -(container allocation - node capacity)
-    ResourceUtilization allocationClone =
-        ResourceUtilization.newInstance(containersAllocation);
-    getContainersMonitor()
-        .subtractNodeResourcesFromResourceUtilization(allocationClone);
-
-    Resource unallocated = UNAVAILABLE;
-    if (allocationClone.getCPU() <= 0 &&
-        allocationClone.getPhysicalMemory() <= 0 &&
-        allocationClone.getVirtualMemory() <= 0) {
-      int cpu = Math.round(allocationClone.getCPU() *
-          getContainersMonitor().getVCoresAllocatedForContainers());
-      long memory = allocationClone.getPhysicalMemory();
-      unallocated = Resource.newInstance(-memory, -cpu);
-    }
-    return unallocated;
-  }
-
-
-  @Override
-  public Resource getAvailableResources() {
-    return getUnallocatedResources();
-  }
-
-  /**
-   * Add Container's resources to the accumulated allocation.
-   * @param container Container.
-   */
-  @Override
-  public void containerLaunched(Container container) {
-    ContainersMonitor.increaseResourceUtilization(
-        getContainersMonitor(), this.containersAllocation,
-        container.getResource());
-  }
-
-  /**
-   * Subtract Container's resources to the accumulated allocation.
-   * @param container Container.
-   */
-  @Override
-  public void containerReleased(Container container) {
-    ContainersMonitor.decreaseResourceUtilization(
-        getContainersMonitor(), this.containersAllocation,
-        container.getResource());
-  }
-
-  public ContainersMonitor getContainersMonitor() {
-    return this.scheduler.getContainersMonitor();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..6e2b617
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link ResourceUtilizationTracker} that equates
+ * resource utilization with the total resource allocated to the container.
+ */
+public class AllocationBasedResourceUtilizationTracker implements
+    ResourceUtilizationTracker {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
+
+  private ResourceUtilization containersAllocation;
+  private ContainerScheduler scheduler;
+
+  AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) {
+    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.scheduler = scheduler;
+  }
+
+  /**
+   * Get the accumulation of totally allocated resources to a container.
+   * @return ResourceUtilization Resource Utilization.
+   */
+  @Override
+  public ResourceUtilization getCurrentUtilization() {
+    return this.containersAllocation;
+  }
+
+  /**
+   * Add Container's resources to the accumulated Utilization.
+   * @param container Container.
+   */
+  @Override
+  public void addContainerResources(Container container) {
+    ContainersMonitor.increaseResourceUtilization(
+        getContainersMonitor(), this.containersAllocation,
+        container.getResource());
+  }
+
+  /**
+   * Subtract Container's resources to the accumulated Utilization.
+   * @param container Container.
+   */
+  @Override
+  public void subtractContainerResource(Container container) {
+    ContainersMonitor.decreaseResourceUtilization(
+        getContainersMonitor(), this.containersAllocation,
+        container.getResource());
+  }
+
+  /**
+   * Check if NM has resources available currently to run the container.
+   * @param container Container.
+   * @return True, if NM has resources available currently to run the container.
+   */
+  @Override
+  public boolean hasResourcesAvailable(Container container) {
+    long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
+    return hasResourcesAvailable(pMemBytes,
+        (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
+        container.getResource().getVirtualCores());
+  }
+
+  private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
+      int cpuVcores) {
+    // Check physical memory.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
+          this.containersAllocation.getPhysicalMemory(),
+          (pMemBytes >> 20),
+          (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
+    }
+    if (this.containersAllocation.getPhysicalMemory() +
+        (int) (pMemBytes >> 20) >
+        (int) (getContainersMonitor()
+            .getPmemAllocatedForContainers() >> 20)) {
+      return false;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("before vMemCheck" +
+              "[isEnabled={}, current={} + asked={} > allowed={}]",
+          getContainersMonitor().isVmemCheckEnabled(),
+          this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
+          (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
+    }
+    // Check virtual memory.
+    if (getContainersMonitor().isVmemCheckEnabled() &&
+        this.containersAllocation.getVirtualMemory() +
+            (int) (vMemBytes >> 20) >
+            (int) (getContainersMonitor()
+                .getVmemAllocatedForContainers() >> 20)) {
+      return false;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("before cpuCheck [asked={} > allowed={}]",
+          this.containersAllocation.getCPU(),
+          getContainersMonitor().getVCoresAllocatedForContainers());
+    }
+    // Check CPU. Compare using integral values of cores to avoid decimal
+    // inaccuracies.
+    if (!hasEnoughCpu(this.containersAllocation.getCPU(),
+        getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Returns whether there is enough space for coresRequested in totalCores.
+   * Converts currentAllocation usage to nearest integer count before comparing,
+   * as floats are inherently imprecise. NOTE: this calculation assumes that
+   * requested core counts must be integers, and currentAllocation core count
+   * must also be an integer.
+   *
+   * @param currentAllocation The current allocation, a float value from 0 to 1.
+   * @param totalCores The total cores in the system.
+   * @param coresRequested The number of cores requested.
+   * @return True if currentAllocationtotalCores*coresRequested &lt;=
+   *         totalCores.
+   */
+  public boolean hasEnoughCpu(float currentAllocation, long totalCores,
+      int coresRequested) {
+    // Must not cast here, as it would truncate the decimal digits.
+    return Math.round(currentAllocation * totalCores)
+        + coresRequested <= totalCores;
+  }
+
+  public ContainersMonitor getContainersMonitor() {
+    return this.scheduler.getContainersMonitor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index e8341c9..d9b713f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -43,7 +42,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,10 +74,6 @@ public class ContainerScheduler extends AbstractService implements
   // Queue of Guaranteed Containers waiting for resources to run
   private final LinkedHashMap<ContainerId, Container>
       queuedGuaranteedContainers = new LinkedHashMap<>();
-  // sum of the resources requested by guaranteed containers in queue
-  private final Resource guaranteedResourcesDemanded =
-      Resource.newInstance(0, 0);
-
   // Queue of Opportunistic Containers waiting for resources to run
   private final LinkedHashMap<ContainerId, Container>
       queuedOpportunisticContainers = new LinkedHashMap<>();
@@ -88,10 +82,6 @@ public class ContainerScheduler extends AbstractService implements
   // or paused to make room for a guaranteed container.
   private final Map<ContainerId, Container> oppContainersToKill =
       new HashMap<>();
-  // sum of the resources to be released by opportunistic containers that
-  // have been marked to be killed or paused.
-  private final Resource opportunisticResourcesToBeReleased =
-      Resource.newInstance(0, 0);
 
   // Containers launched by the Scheduler will take a while to actually
   // move to the RUNNING state, but should still be fair game for killing
@@ -129,17 +119,6 @@ public class ContainerScheduler extends AbstractService implements
             DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
   }
 
-  @VisibleForTesting
-  public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
-      NodeManagerMetrics metrics, int qLength) {
-    super(ContainerScheduler.class.getName());
-    this.context = context;
-    this.dispatcher = dispatcher;
-    this.metrics = metrics;
-    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
-    this.opportunisticContainersStatus =
-        OpportunisticContainersStatus.newInstance();
-  }
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
@@ -149,16 +128,20 @@ public class ContainerScheduler extends AbstractService implements
             YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
             YarnConfiguration.
                 DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
-    // We assume over allocation configurations have been initialized
-    this.utilizationTracker = getResourceTracker();
   }
 
-  private AllocationBasedResourceTracker getResourceTracker() {
-    if (context.isOverAllocationEnabled()) {
-      return new UtilizationBasedResourceTracker(this);
-    } else {
-      return new AllocationBasedResourceTracker(this);
-    }
+  @VisibleForTesting
+  public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+      NodeManagerMetrics metrics, int qLength) {
+    super(ContainerScheduler.class.getName());
+    this.context = context;
+    this.dispatcher = dispatcher;
+    this.metrics = metrics;
+    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+    this.utilizationTracker =
+        new AllocationBasedResourceUtilizationTracker(this);
+    this.opportunisticContainersStatus =
+        OpportunisticContainersStatus.newInstance();
   }
 
   /**
@@ -181,18 +164,14 @@ public class ContainerScheduler extends AbstractService implements
       if (event instanceof UpdateContainerSchedulerEvent) {
         onUpdateContainer((UpdateContainerSchedulerEvent) event);
       } else {
-        LOG.error("Unknown event type on UpdateContainer: " + event.getType());
+        LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
       }
       break;
     case SHED_QUEUED_CONTAINERS:
       shedQueuedOpportunisticContainers();
       break;
     case RECOVERY_COMPLETED:
-      startPendingContainers(false);
-      break;
-    case SCHEDULE_CONTAINERS:
-      startPendingContainers(true);
-      break;
+      startPendingContainers(maxOppQueueLength <= 0);
     default:
       LOG.error("Unknown event arrived at ContainerScheduler: "
           + event.toString());
@@ -207,10 +186,10 @@ public class ContainerScheduler extends AbstractService implements
     ContainerId containerId = updateEvent.getContainer().getContainerId();
     if (updateEvent.isResourceChange()) {
       if (runningContainers.containsKey(containerId)) {
-        this.utilizationTracker.containerReleased(
+        this.utilizationTracker.subtractContainerResource(
             new ContainerImpl(getConfig(), null, null, null, null,
                 updateEvent.getOriginalToken(), context));
-        this.utilizationTracker.containerLaunched(
+        this.utilizationTracker.addContainerResources(
             updateEvent.getContainer());
         getContainersMonitor().handle(
             new ChangeMonitoringContainerResourceEvent(containerId,
@@ -226,22 +205,20 @@ public class ContainerScheduler extends AbstractService implements
         if (queuedOpportunisticContainers.remove(containerId) != null) {
           queuedGuaranteedContainers.put(containerId,
               updateEvent.getContainer());
-          Resources.addTo(guaranteedResourcesDemanded,
-              updateEvent.getContainer().getResource());
-          startPendingContainers(true);
+          //Kill/pause opportunistic containers if any to make room for
+          // promotion request
+          reclaimOpportunisticContainerResources(updateEvent.getContainer());
         }
       } else {
         // Demotion of queued container.. Should not happen too often
         // since you should not find too many queued guaranteed
         // containers
         if (queuedGuaranteedContainers.remove(containerId) != null) {
-          Resources.subtractFrom(guaranteedResourcesDemanded,
-              updateEvent.getContainer().getResource());
           queuedOpportunisticContainers.put(containerId,
               updateEvent.getContainer());
-          startPendingContainers(false);
         }
       }
+      startPendingContainers(maxOppQueueLength <= 0);
     }
   }
 
@@ -259,7 +236,6 @@ public class ContainerScheduler extends AbstractService implements
         || rcs == RecoveredContainerStatus.PAUSED) {
       if (execType == ExecutionType.GUARANTEED) {
         queuedGuaranteedContainers.put(container.getContainerId(), container);
-        Resources.addTo(guaranteedResourcesDemanded, container.getResource());
       } else if (execType == ExecutionType.OPPORTUNISTIC) {
         queuedOpportunisticContainers
             .put(container.getContainerId(), container);
@@ -270,7 +246,7 @@ public class ContainerScheduler extends AbstractService implements
       }
     } else if (rcs == RecoveredContainerStatus.LAUNCHED) {
       runningContainers.put(container.getContainerId(), container);
-      utilizationTracker.containerLaunched(container);
+      utilizationTracker.addContainerResources(container);
     }
   }
 
@@ -330,107 +306,65 @@ public class ContainerScheduler extends AbstractService implements
   }
 
   private void onResourcesReclaimed(Container container) {
-    ContainerId containerId = container.getContainerId();
+    oppContainersToKill.remove(container.getContainerId());
 
     // This could be killed externally for eg. by the ContainerManager,
     // in which case, the container might still be queued.
-    if (queuedOpportunisticContainers.remove(containerId) != null) {
-      return;
-    }
-
-    // This could be killed externally for eg. by the ContainerManager,
-    // in which case, the container might still be queued.
-    if (queuedGuaranteedContainers.remove(containerId) != null) {
-      Resources.addTo(guaranteedResourcesDemanded, container.getResource());
-      return;
-    }
-
-    if (oppContainersToKill.remove(containerId) != null) {
-      Resources.subtractFrom(
-          opportunisticResourcesToBeReleased, container.getResource());
+    Container queued =
+        queuedOpportunisticContainers.remove(container.getContainerId());
+    if (queued == null) {
+      queuedGuaranteedContainers.remove(container.getContainerId());
     }
 
     // Requeue PAUSED containers
     if (container.getContainerState() == ContainerState.PAUSED) {
       if (container.getContainerTokenIdentifier().getExecutionType() ==
           ExecutionType.GUARANTEED) {
-        queuedGuaranteedContainers.put(containerId, container);
-        Resources.addTo(guaranteedResourcesDemanded, container.getResource());
+        queuedGuaranteedContainers.put(container.getContainerId(), container);
       } else {
-        queuedOpportunisticContainers.put(containerId, container);
+        queuedOpportunisticContainers.put(
+            container.getContainerId(), container);
       }
     }
     // decrement only if it was a running container
-    Container completedContainer = runningContainers.remove(containerId);
+    Container completedContainer = runningContainers.remove(container
+        .getContainerId());
     // only a running container releases resources upon completion
     boolean resourceReleased = completedContainer != null;
     if (resourceReleased) {
-      this.utilizationTracker.containerReleased(container);
+      this.utilizationTracker.subtractContainerResource(container);
       if (container.getContainerTokenIdentifier().getExecutionType() ==
           ExecutionType.OPPORTUNISTIC) {
         this.metrics.completeOpportunisticContainer(container.getResource());
       }
-
-      // In case of over-allocation being turned on, we may need to reclaim
-      // more resources since the opportunistic containers that have been
-      // killed or paused may have not released as much resource as we need.
-      boolean reclaimOpportunisticResources = context.isOverAllocationEnabled();
-      startPendingContainers(reclaimOpportunisticResources);
+      boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+      startPendingContainers(forceStartGuaranteedContainers);
     }
   }
 
   /**
    * Start pending containers in the queue.
-   * @param reclaimOpportunisticResources if set to true, resources allocated
-   *                  to running OPPORTUNISTIC containers will be reclaimed in
-   *                  cases where there are GUARANTEED containers being queued
+   * @param forceStartGuaranteedContaieners When this is true, start guaranteed
+   *        container without looking at available resource
    */
-  private void startPendingContainers(boolean reclaimOpportunisticResources) {
-    // When opportunistic container not allowed (which is determined by
-    // max-queue length of pending opportunistic containers <= 0), start
-    // guaranteed containers without looking at available resources and
-    // skip scanning the queue of opportunistic containers
-    if (maxOppQueueLength <= 0) {
-      forcefullyStartGuaranteedContainers();
-      return;
-    }
-
-    Resource available = utilizationTracker.getAvailableResources();
-
-    // Start guaranteed containers that are queued, if resources available.
-    boolean allGuaranteedContainersLaunched =
-        startGuaranteedContainers(available);
-    // Start opportunistic containers, if resources available, which is true
-    // if all guaranteed containers in queue have been launched.
-    if (allGuaranteedContainersLaunched) {
-      startOpportunisticContainers(available);
-    } else {
-      // If not all guaranteed containers in queue are launched, we may need
-      // to reclaim resources from opportunistic containers that are running.
-      if (reclaimOpportunisticResources) {
-        reclaimOpportunisticContainerResources();
-      }
+  private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
+    // Start guaranteed containers that are paused, if resources available.
+    boolean resourcesAvailable = startContainers(
+          queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+    // Start opportunistic containers, if resources available.
+    if (resourcesAvailable) {
+      startContainers(queuedOpportunisticContainers.values(), false);
     }
   }
 
-  /**
-   * Try to launch as many GUARANTEED containers as possible.
-   * @param available the amount of resources available to launch containers
-   * @return true if all queued GUARANTEED containers are launched
-   *              or there is no GUARANTEED containers to launch
-   */
-  private boolean startGuaranteedContainers(Resource available) {
-    Iterator<Container> cIter =
-        queuedGuaranteedContainers.values().iterator();
+  private boolean startContainers(
+      Collection<Container> containersToBeStarted, boolean force) {
+    Iterator<Container> cIter = containersToBeStarted.iterator();
     boolean resourcesAvailable = true;
     while (cIter.hasNext() && resourcesAvailable) {
       Container container = cIter.next();
-      if (isResourceAvailable(available, container)) {
-        startContainer(container);
-        Resources.subtractFrom(available, container.getResource());
+      if (tryStartContainer(container, force)) {
         cIter.remove();
-        Resources.subtractFrom(
-            guaranteedResourcesDemanded, container.getResource());
       } else {
         resourcesAvailable = false;
       }
@@ -438,49 +372,25 @@ public class ContainerScheduler extends AbstractService implements
     return resourcesAvailable;
   }
 
-  /**
-   * Launch all queued GUARANTEED containers without checking resource
-   * availability. This is an optimization in cases where OPPORTUNISTIC
-   * containers are not allowed on the node.
-   */
-  private void forcefullyStartGuaranteedContainers() {
-    Iterator<Container> cIter =
-        queuedGuaranteedContainers.values().iterator();
-    while (cIter.hasNext()) {
-      Container container = cIter.next();
+  private boolean tryStartContainer(Container container, boolean force) {
+    boolean containerStarted = false;
+    // call startContainer without checking available resource when force==true
+    if (force || resourceAvailableToStartContainer(
+        container)) {
       startContainer(container);
-      cIter.remove();
-      Resources.subtractFrom(
-          guaranteedResourcesDemanded, container.getResource());
+      containerStarted = true;
     }
+    return containerStarted;
   }
+
   /**
-   * Try to launch as many OPPORTUNISTIC containers as possible.
-   * @param available the amount of resources available to launch containers
-   * @return true if all OPPORTUNISTIC containers are launched
-   *              or there is no OPPORTUNISTIC containers to launch
+   * Check if there is resource available to start a given container
+   * immediately. (This can be extended to include overallocated resources)
+   * @param container the container to start
+   * @return true if container can be launched directly
    */
-  private boolean startOpportunisticContainers(Resource available) {
-    Iterator<Container> cIter =
-        queuedOpportunisticContainers.values().iterator();
-    boolean resourcesAvailable = true;
-    while (cIter.hasNext() && resourcesAvailable) {
-      Container container = cIter.next();
-      if (isResourceAvailable(available, container)) {
-        startContainer(container);
-        Resources.subtractFrom(available, container.getResource());
-        cIter.remove();
-      } else {
-        resourcesAvailable = false;
-      }
-    }
-    return resourcesAvailable;
-  }
-
-  private static boolean isResourceAvailable(
-      Resource resource, Container container) {
-    Resource left = Resources.subtract(resource, container.getResource());
-    return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0;
+  private boolean resourceAvailableToStartContainer(Container container) {
+    return this.utilizationTracker.hasResourcesAvailable(container);
   }
 
   private boolean enqueueContainer(Container container) {
@@ -490,7 +400,6 @@ public class ContainerScheduler extends AbstractService implements
     boolean isQueued;
     if (isGuaranteedContainer) {
       queuedGuaranteedContainers.put(container.getContainerId(), container);
-      Resources.addTo(guaranteedResourcesDemanded, container.getResource());
       isQueued = true;
     } else {
       if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
@@ -535,7 +444,18 @@ public class ContainerScheduler extends AbstractService implements
     // enough number of opportunistic containers.
     if (isGuaranteedContainer) {
       enqueueContainer(container);
-      startPendingContainers(true);
+
+      // When opportunistic container not allowed (which is determined by
+      // max-queue length of pending opportunistic containers <= 0), start
+      // guaranteed containers without looking at available resources.
+      boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+      startPendingContainers(forceStartGuaranteedContainers);
+
+      // if the guaranteed container is queued, we need to preempt opportunistic
+      // containers for make room for it
+      if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
+        reclaimOpportunisticContainerResources(container);
+      }
     } else {
       // Given an opportunistic container, we first try to start as many queuing
       // guaranteed containers as possible followed by queuing opportunistic
@@ -553,19 +473,19 @@ public class ContainerScheduler extends AbstractService implements
   }
 
   @SuppressWarnings("unchecked")
-  private void reclaimOpportunisticContainerResources() {
+  private void reclaimOpportunisticContainerResources(Container container) {
     List<Container> extraOppContainersToReclaim =
-        pickOpportunisticContainersToReclaimResources();
-    killOpportunisticContainers(extraOppContainersToReclaim);
-  }
-
-  private void killOpportunisticContainers(
-      Collection<Container> containersToReclaim) {
-    for (Container contToReclaim : containersToReclaim) {
+        pickOpportunisticContainersToReclaimResources(
+            container.getContainerId());
+    // Kill the opportunistic containers that were chosen.
+    for (Container contToReclaim : extraOppContainersToReclaim) {
       String preemptionAction = usePauseEventForPreemption == true ? "paused" :
-          "preempted";
-      LOG.info("Container {} will be {} to start the execution of guaranteed" +
-              " containers.", contToReclaim.getContainerId(), preemptionAction);
+          "resumed";
+      LOG.info(
+          "Container {} will be {} to start the "
+              + "execution of guaranteed container {}.",
+          contToReclaim.getContainerId(), preemptionAction,
+          container.getContainerId());
 
       if (usePauseEventForPreemption) {
         contToReclaim.sendPauseEvent(
@@ -576,15 +496,13 @@ public class ContainerScheduler extends AbstractService implements
             "Container Killed to make room for Guaranteed Container.");
       }
       oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
-      Resources.addTo(
-          opportunisticResourcesToBeReleased, contToReclaim.getResource());
     }
   }
 
   private void startContainer(Container container) {
     LOG.info("Starting container [" + container.getContainerId()+ "]");
     runningContainers.put(container.getContainerId(), container);
-    this.utilizationTracker.containerLaunched(container);
+    this.utilizationTracker.addContainerResources(container);
     if (container.getContainerTokenIdentifier().getExecutionType() ==
         ExecutionType.OPPORTUNISTIC) {
       this.metrics.startOpportunisticContainer(container.getResource());
@@ -592,12 +510,14 @@ public class ContainerScheduler extends AbstractService implements
     container.sendLaunchEvent();
   }
 
-  private List<Container> pickOpportunisticContainersToReclaimResources() {
+  private List<Container> pickOpportunisticContainersToReclaimResources(
+      ContainerId containerToStartId) {
     // The opportunistic containers that need to be killed for the
     // given container to start.
     List<Container> extraOpportContainersToKill = new ArrayList<>();
     // Track resources that need to be freed.
-    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp();
+    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+        containerToStartId);
 
     // Go over the running opportunistic containers.
     // Use a descending iterator to kill more recently started containers.
@@ -616,19 +536,15 @@ public class ContainerScheduler extends AbstractService implements
           continue;
         }
         extraOpportContainersToKill.add(runningCont);
-        // In the case of over-allocation, the running container may not
-        // release as much resources as it has requested, but we'll check
-        // again if more containers need to be killed/paused when this
-        // container is released.
         ContainersMonitor.decreaseResourceUtilization(
             getContainersMonitor(), resourcesToFreeUp,
             runningCont.getResource());
       }
     }
     if (!hasSufficientResources(resourcesToFreeUp)) {
-      LOG.warn("There are no sufficient resources to start guaranteed" +
-          " containers at the moment. Opportunistic containers are in" +
-          " the process of being killed to make room.");
+      LOG.warn("There are no sufficient resources to start guaranteed [{}]" +
+          "at the moment. Opportunistic containers are in the process of" +
+          "being killed to make room.", containerToStartId);
     }
     return extraOpportContainersToKill;
   }
@@ -643,42 +559,34 @@ public class ContainerScheduler extends AbstractService implements
             * getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
   }
 
-  /**
-   * Determine how much resources are needed to be freed up to launch the given
-   * GUARANTEED container. Used to determine how many running OPPORTUNISTIC
-   * containers need to be killed/paused, assuming OPPORTUNISTIC containers to
-   * be killed/paused will release the amount of resources they have requested.
-   *
-   * If the node is over-allocating itself, this may cause not enough
-   * OPPORTUNISTIC containers being killed/paused in cases where the running
-   * OPPORTUNISTIC containers are not consuming fully their resource requests.
-   * We'd check again upon container completion events to see if more running
-   * OPPORTUNISTIC containers need to be killed/paused.
-   *
-   * @return the amount of resource needed to be reclaimed for this container
-   */
-  private ResourceUtilization resourcesToFreeUp() {
+  private ResourceUtilization resourcesToFreeUp(
+      ContainerId containerToStartId) {
     // Get allocation of currently allocated containers.
     ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
-        .newInstance(0, 0, 0.0f);
-
-    // Add to the allocation the allocation of pending guaranteed containers.
-    ContainersMonitor.increaseResourceUtilization(getContainersMonitor(),
-        resourceAllocationToFreeUp, guaranteedResourcesDemanded);
+        .newInstance(this.utilizationTracker.getCurrentUtilization());
+
+    // Add to the allocation the allocation of the pending guaranteed
+    // containers that will start before the current container will be started.
+    for (Container container : queuedGuaranteedContainers.values()) {
+      ContainersMonitor.increaseResourceUtilization(
+          getContainersMonitor(), resourceAllocationToFreeUp,
+          container.getResource());
+      if (container.getContainerId().equals(containerToStartId)) {
+        break;
+      }
+    }
 
     // These resources are being freed, likely at the behest of another
     // guaranteed container..
-    ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
-        resourceAllocationToFreeUp, opportunisticResourcesToBeReleased);
-
-    // Deduct any remaining resources available
-    Resource availableResources = utilizationTracker.getAvailableResources();
-    if (availableResources.getVirtualCores() > 0 &&
-        availableResources.getMemorySize() > 0) {
-      ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(),
-          resourceAllocationToFreeUp, availableResources);
+    for (Container container : oppContainersToKill.values()) {
+      ContainersMonitor.decreaseResourceUtilization(
+          getContainersMonitor(), resourceAllocationToFreeUp,
+          container.getResource());
     }
 
+    // Subtract the overall node resources.
+    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+        resourceAllocationToFreeUp);
     return resourceAllocationToFreeUp;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 9ad4f91..294eddf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -28,7 +28,5 @@ public enum ContainerSchedulerEventType {
   // Producer: Node HB response - RM has asked to shed the queue
   SHED_QUEUED_CONTAINERS,
   CONTAINER_PAUSED,
-  RECOVERY_COMPLETED,
-  // Producer: Containers Monitor when over-allocation is on
-  SCHEDULE_CONTAINERS
+  RECOVERY_COMPLETED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
deleted file mode 100644
index 58b73d2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-
-/**
- * Keeps track of containers utilization over time and determines how much
- * resources are available to launch containers when over-allocation is on.
- */
-public abstract class NMAllocationPolicy {
-  protected final ResourceThresholds overAllocationThresholds;
-  protected final ContainersMonitor containersMonitor;
-
-  public NMAllocationPolicy(
-      ResourceThresholds overAllocationThresholds,
-      ContainersMonitor containersMonitor) {
-    this.containersMonitor = containersMonitor;
-    this.overAllocationThresholds = overAllocationThresholds;
-  }
-
-  /**
-   * Handle container launch events.
-   * @param container the container that has been launched
-   */
-  public void containerLaunched(Container container) {
-
-  }
-
-  /**
-   * Handle container release events.
-   * @param container the container that has been released
-   */
-  public void containerReleased(Container container) {
-
-  }
-
-  /**
-   * Get the amount of resources to launch containers when
-   * over-allocation is turned on.
-   * @return the amount of resources available to launch containers
-   */
-  public abstract Resource getAvailableResources();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 98d99c6..3c17eca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 
@@ -39,20 +38,22 @@ public interface ResourceUtilizationTracker {
   ResourceUtilization getCurrentUtilization();
 
   /**
-   * Get the amount of resources currently available to launch containers.
-   * @return Resource resources available to launch containers
+   * Add Container's resources to Node Utilization.
+   * @param container Container.
    */
-  Resource getAvailableResources();
+  void addContainerResources(Container container);
 
   /**
-   * Add Container's resources to Node Utilization upon container launch.
+   * Subtract Container's resources to Node Utilization.
    * @param container Container.
    */
-  void containerLaunched(Container container);
+  void subtractContainerResource(Container container);
 
   /**
-   * Subtract Container's resources to Node Utilization upon container release.
+   * Check if NM has resources available currently to run the container.
    * @param container Container.
+   * @return True, if NM has resources available currently to run the container.
    */
-  void containerReleased(Container container);
+  boolean hasResourcesAvailable(Container container);
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
deleted file mode 100644
index f486506..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-
-/**
- * An implementation of NMAllocationPolicy based on the
- * snapshot of the latest containers utilization to determine how much
- * resources are available * to launch containers when over-allocation
- * is turned on.
- */
-public class SnapshotBasedOverAllocationPolicy
-    extends NMAllocationPolicy {
-
-  public SnapshotBasedOverAllocationPolicy(
-      ResourceThresholds overAllocationThresholds,
-      ContainersMonitor containersMonitor) {
-    super(overAllocationThresholds, containersMonitor);
-  }
-
-  @Override
-  public Resource getAvailableResources() {
-    ResourceUtilization utilization =
-        containersMonitor.getContainersUtilization(true).getUtilization();
-    long memoryAvailable = Math.round(
-        overAllocationThresholds.getMemoryThreshold() *
-            containersMonitor.getPmemAllocatedForContainers()) -
-        (utilization.getPhysicalMemory() << 20);
-    int vcoreAvailable = Math.round(
-        (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) *
-            containersMonitor.getVCoresAllocatedForContainers());
-    return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
deleted file mode 100644
index 6f9bc82..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-* An resource availability tracker that determines if there are resources
-* available based on if there are unallocated resources or if there are
-* un-utilized resources.
-*/
-public class UtilizationBasedResourceTracker
-    extends AllocationBasedResourceTracker {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AllocationBasedResourceTracker.class);
-
-  private final NMAllocationPolicy overAllocationPolicy;
-
-  UtilizationBasedResourceTracker(ContainerScheduler scheduler) {
-    super(scheduler);
-    this.overAllocationPolicy =
-        getContainersMonitor().getContainerOverAllocationPolicy();
-  }
-
-  @Override
-  public void containerLaunched(Container container) {
-    super.containerLaunched(container);
-    if (overAllocationPolicy != null) {
-      overAllocationPolicy.containerLaunched(container);
-    }
-  }
-
-  @Override
-  public void containerReleased(Container container) {
-    super.containerReleased(container);
-    if (overAllocationPolicy != null) {
-      overAllocationPolicy.containerReleased(container);
-    }
-  }
-
-  @Override
-  public Resource getAvailableResources() {
-    Resource resourceBasedOnAllocation = getUnallocatedResources();
-    Resource resourceBasedOnUtilization =
-        getResourcesAvailableBasedOnUtilization();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The amount of resources available based on allocation is " +
-          resourceBasedOnAllocation + ", based on utilization is " +
-          resourceBasedOnUtilization);
-    }
-
-    return Resources.componentwiseMax(resourceBasedOnAllocation,
-        resourceBasedOnUtilization);
-  }
-
-  /**
-   * Get the amount of resources based on the slack between
-   * the actual utilization and desired utilization.
-   * @return Resource resource available
-   */
-  private Resource getResourcesAvailableBasedOnUtilization() {
-    if (overAllocationPolicy == null) {
-      return Resources.none();
-    }
-
-    return overAllocationPolicy.getAvailableResources();
-  }
-
-  @Override
-  public ResourceUtilization getCurrentUtilization() {
-    return getContainersMonitor().getContainersUtilization(false)
-        .getUtilization();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 05e9dd0..93d0afb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 
-import org.apache.hadoop.yarn.api.records.ContainerSubState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -346,40 +345,6 @@ public abstract class BaseContainerManagerTest {
           fStates.contains(containerStatus.getState()));
   }
 
-  public static void waitForContainerSubState(
-      ContainerManagementProtocol containerManager, ContainerId containerID,
-      ContainerSubState finalState)
-      throws InterruptedException, YarnException, IOException {
-    waitForContainerSubState(containerManager, containerID,
-        Arrays.asList(finalState), 20);
-  }
-  public static void waitForContainerSubState(
-      ContainerManagementProtocol containerManager, ContainerId containerID,
-      List<ContainerSubState> finalStates, int timeOutMax)
-      throws InterruptedException, YarnException, IOException {
-    List<ContainerId> list = new ArrayList<>();
-    list.add(containerID);
-    GetContainerStatusesRequest request =
-        GetContainerStatusesRequest.newInstance(list);
-    ContainerStatus containerStatus;
-    HashSet<ContainerSubState> fStates = new HashSet<>(finalStates);
-    int timeoutSecs = 0;
-    do {
-      Thread.sleep(1000);
-      containerStatus =
-          containerManager.getContainerStatuses(request)
-              .getContainerStatuses().get(0);
-      LOG.info("Waiting for container to get into one of states " + fStates
-          + ". Current state is " + containerStatus.getContainerSubState());
-      timeoutSecs += 1;
-    } while (!fStates.contains(containerStatus.getContainerSubState())
-        && timeoutSecs < timeOutMax);
-    LOG.info("Container state is " + containerStatus.getContainerSubState());
-    Assert.assertTrue("ContainerSubState is not correct (timedout)",
-        fStates.contains(containerStatus.getContainerSubState()));
-  }
-
-
   public static void waitForApplicationState(
       ContainerManagerImpl containerManager, ApplicationId appID,
       ApplicationState finalState)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 21d1889..d7d826c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -282,7 +282,7 @@ public class TestContainersMonitorResourceChange {
     // will be 0.
     assertEquals(
         "Resource utilization must be default with MonitorThread's first run",
-        0, containersMonitor.getContainersUtilization(false).getUtilization()
+        0, containersMonitor.getContainersUtilization()
             .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
 
     // Verify the container utilization value. Since atleast one round is done,
@@ -297,9 +297,8 @@ public class TestContainersMonitorResourceChange {
       ContainersMonitorImpl containersMonitor, int timeoutMsecs)
       throws InterruptedException {
     int timeWaiting = 0;
-    while (0 == containersMonitor.getContainersUtilization(false)
-        .getUtilization().compareTo(
-            ResourceUtilization.newInstance(0, 0, 0.0f))) {
+    while (0 == containersMonitor.getContainersUtilization()
+        .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) {
       if (timeWaiting >= timeoutMsecs) {
         break;
       }
@@ -311,7 +310,7 @@ public class TestContainersMonitorResourceChange {
     }
 
     assertTrue("Resource utilization is not changed from second run onwards",
-        0 != containersMonitor.getContainersUtilization(false).getUtilization()
+        0 != containersMonitor.getContainersUtilization()
             .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f)));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
deleted file mode 100644
index 1e8bfdf..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the {@link AllocationBasedResourceTracker} class.
- */
-public class TestAllocationBasedResourceTracker {
-
-  private ContainerScheduler mockContainerScheduler;
-
-  @Before
-  public void setup() {
-    mockContainerScheduler = mock(ContainerScheduler.class);
-    ContainersMonitor containersMonitor =
-        new ContainersMonitorImpl(mock(ContainerExecutor.class),
-            mock(AsyncDispatcher.class), mock(Context.class));
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
-    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
-    conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
-    conf.setInt(YarnConfiguration.NM_VCORES, 8);
-    containersMonitor.init(conf);
-    when(mockContainerScheduler.getContainersMonitor())
-        .thenReturn(containersMonitor);
-  }
-
-  /**
-   * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
-   * hasResourceAvailable should return false.
-   */
-  @Test
-  public void testHasResourcesAvailable() {
-    AllocationBasedResourceTracker tracker =
-        new AllocationBasedResourceTracker(mockContainerScheduler);
-    Container testContainer = mock(Container.class);
-    when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
-    for (int i = 0; i < 2; i++) {
-      Assert.assertTrue(
-          isResourcesAvailable(tracker.getAvailableResources(), testContainer));
-      tracker.containerLaunched(testContainer);
-    }
-    Assert.assertFalse(
-        isResourcesAvailable(tracker.getAvailableResources(), testContainer));
-  }
-
-  private static boolean isResourcesAvailable(
-      Resource available, Container container) {
-    return available.compareTo(container.getResource()) >= 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9a7055b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
new file mode 100644
index 0000000..82c2147
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link AllocationBasedResourceUtilizationTracker} class.
+ */
+public class TestAllocationBasedResourceUtilizationTracker {
+
+  private ContainerScheduler mockContainerScheduler;
+
+  @Before
+  public void setup() {
+    mockContainerScheduler = mock(ContainerScheduler.class);
+    ContainersMonitor containersMonitor =
+        new ContainersMonitorImpl(mock(ContainerExecutor.class),
+            mock(AsyncDispatcher.class), mock(Context.class));
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+    conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f);
+    conf.setInt(YarnConfiguration.NM_VCORES, 8);
+    containersMonitor.init(conf);
+    when(mockContainerScheduler.getContainersMonitor())
+        .thenReturn(containersMonitor);
+  }
+
+  /**
+   * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the
+   * hasResourceAvailable should return false.
+   */
+  @Test
+  public void testHasResourcesAvailable() {
+    AllocationBasedResourceUtilizationTracker tracker =
+        new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
+    Container testContainer = mock(Container.class);
+    when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
+    for (int i = 0; i < 2; i++) {
+      Assert.assertTrue(tracker.hasResourcesAvailable(testContainer));
+      tracker.addContainerResources(testContainer);
+    }
+    Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
+  }
+
+  /**
+   * Test the case where the current allocation has been truncated to 0.8888891
+   * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return
+   * true.
+   */
+  @Test
+  public void testHasEnoughCpu() {
+    AllocationBasedResourceUtilizationTracker tracker =
+        new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
+    float currentAllocation = 0.8888891f;
+    long totalCores = 9;
+    int alreadyUsedCores = 8;
+    Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores,
+        (int) totalCores - alreadyUsedCores));
+    Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores,
+        (int) totalCores - alreadyUsedCores + 1));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org