You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/09/24 19:08:11 UTC

[49/50] [abbrv] hadoop git commit: YARN-6672. Add NM preemption of opportunistic containers when utilization goes high.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
index 183d868..a2d4aa8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java
@@ -20,12 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -36,11 +33,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ConfigurationException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
@@ -53,8 +47,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -78,8 +70,6 @@ import java.util.concurrent.ConcurrentHashMap;
 public class TestContainerSchedulerWithOverAllocation
     extends BaseContainerManagerTest {
   private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3;
-  private static final int NM_CONTAINERS_VCORES = 4;
-  private static final int NM_CONTAINERS_MEMORY_MB = 2048;
 
   static {
     LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
@@ -115,6 +105,11 @@ public class TestContainerSchedulerWithOverAllocation
     conf.setFloat(
         YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD,
         0.75f);
+    conf.setFloat(YarnConfiguration.NM_OVERALLOCATION_CPU_PREEMPTION_THRESHOLD,
+        0.8f);
+    conf.setFloat(
+        YarnConfiguration.NM_OVERALLOCATION_MEMORY_PREEMPTION_THRESHOLD, 0.8f);
+    conf.setInt(YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_CPU_COUNT, 2);
     // disable the monitor thread in ContainersMonitor to allow control over
     // when opportunistic containers are launched with over-allocation
     conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false);
@@ -134,9 +129,9 @@ public class TestContainerSchedulerWithOverAllocation
     StartContainersRequest allRequests = StartContainersRequest.newInstance(
         new ArrayList<StartContainerRequest>() { {
           add(createStartContainerRequest(0,
-              BuilderUtils.newResource(1024, 1), false));
+              BuilderUtils.newResource(1024, 1), ExecutionType.OPPORTUNISTIC));
           add(createStartContainerRequest(1,
-              BuilderUtils.newResource(1024, 1), true));
+              BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
         } }
     );
     containerManager.startContainers(allRequests);
@@ -171,9 +166,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), true));
+                BuilderUtils.newResource(824, 1), ExecutionType.GUARANTEED));
           }
         }
     ));
@@ -191,11 +186,9 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false))
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC))
     ));
-
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
 
     // this container is not expected to be started immediately because
     // opportunistic containers cannot be started if the node would be
@@ -212,8 +205,8 @@ public class TestContainerSchedulerWithOverAllocation
     });
 
     // try to start opportunistic containers out of band.
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .startContainersOutOfBandUponLowUtilization();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
 
     // this container is expected to be started immediately because there
     // are (memory: 1024, vcore: 0.625) available based on over-allocation
@@ -247,9 +240,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), true));
+                BuilderUtils.newResource(824, 1), ExecutionType.GUARANTEED));
           }
         }
     ));
@@ -267,17 +260,14 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false))
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC))
     ));
 
     // try to start opportunistic containers out of band because they can
     // not be launched at container scheduler event if the node would be
     // over-allocated.
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .startContainersOutOfBandUponLowUtilization();
-
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
 
     // this container will not start immediately because there is not
     // enough resource available at the moment either in terms of
@@ -312,9 +302,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
           }
         }
     ));
@@ -331,11 +321,9 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false))
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC))
     ));
-
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
 
     // this container is not expected to be started immediately because
     // opportunistic containers cannot be started if the node would be
@@ -352,8 +340,8 @@ public class TestContainerSchedulerWithOverAllocation
     });
 
     // try to start opportunistic containers out of band.
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .startContainersOutOfBandUponLowUtilization();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
 
     // this container is expected to be started because there is resources
     // available because the actual utilization is very low
@@ -384,9 +372,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
           }
         }
     ));
@@ -406,13 +394,11 @@ public class TestContainerSchedulerWithOverAllocation
     for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) {
       moreContainerRequests.add(
           createStartContainerRequest(2 + a,
-              BuilderUtils.newResource(512, 1), false));
+              BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
     }
     containerManager.startContainers(
         StartContainersRequest.newInstance(moreContainerRequests));
-
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
 
     // All OPPORTUNISTIC containers but the last one should be queued.
     // The last OPPORTUNISTIC container to launch should be killed.
@@ -454,9 +440,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1200, 1), true));
+                BuilderUtils.newResource(1200, 1), ExecutionType.GUARANTEED));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(400, 1), true));
+                BuilderUtils.newResource(400, 1), ExecutionType.GUARANTEED));
           }
         }
     ));
@@ -473,7 +459,7 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(2,
-                BuilderUtils.newResource(400, 1), false))
+                BuilderUtils.newResource(400, 1), ExecutionType.OPPORTUNISTIC))
     ));
 
     // the OPPORTUNISTIC container can be safely launched even though
@@ -507,9 +493,11 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
+                BuilderUtils.newResource(1024, 1),
+                ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), false));
+                BuilderUtils.newResource(824, 1),
+                ExecutionType.OPPORTUNISTIC));
           }
         }
     ));
@@ -527,7 +515,7 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), true))
+                BuilderUtils.newResource(512, 1), ExecutionType.GUARANTEED))
     ));
 
     // the GUARANTEED container is expected be launched immediately without
@@ -561,9 +549,11 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
+                BuilderUtils.newResource(1024, 1),
+                ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(824, 1), false));
+                BuilderUtils.newResource(824, 1),
+                ExecutionType.OPPORTUNISTIC));
           }
         }
     ));
@@ -581,7 +571,7 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), true))
+                BuilderUtils.newResource(512, 1), ExecutionType.GUARANTEED))
     ));
 
     BaseContainerManagerTest.waitForContainerSubState(containerManager,
@@ -631,16 +621,18 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
+                BuilderUtils.newResource(1024, 1),
+                ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), false));
+                BuilderUtils.newResource(1024, 1),
+                ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(2,
-                BuilderUtils.newResource(1024, 1), false));
+                BuilderUtils.newResource(1024, 1),
+                ExecutionType.OPPORTUNISTIC));
           }
         }
     ));
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
 
     // Two OPPORTUNISTIC containers are expected to start with the
     // unallocated resources, but one will be queued because no
@@ -655,8 +647,8 @@ public class TestContainerSchedulerWithOverAllocation
     // try to start the opportunistic container out of band because it can
     // not be launched at container scheduler event if the node would be
     // over-allocated.
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .startContainersOutOfBandUponLowUtilization();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
 
     // now the queued opportunistic container should also start
     BaseContainerManagerTest.waitForContainerSubState(containerManager,
@@ -671,7 +663,7 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(3,
-                BuilderUtils.newResource(512, 1), true))
+                BuilderUtils.newResource(512, 1), ExecutionType.GUARANTEED))
     ));
 
     // the GUARANTEED container is expected be launched immediately without
@@ -690,83 +682,6 @@ public class TestContainerSchedulerWithOverAllocation
   }
 
   /**
-   * Start four OPPORTUNISTIC containers which in aggregates exceeds the
-   * capacity of the node. The real resource utilization of the first two
-   * OPPORTUNISTIC containers are high whereas that of the latter two are
-   * almost zero. Then try to start a GUARANTEED container. The GUARANTEED
-   * container will eventually start running after preempting the third
-   * and fourth OPPORTUNISTIC container (which releases no resources) and
-   * then the second OPPORTUNISTIC container.
-   */
-  public void
-      testKillOppContainersConservativelyWithOverallocationHighUtilization()
-          throws Exception {
-    containerManager.start();
-
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        new ArrayList<StartContainerRequest>() {
-          {
-            add(createStartContainerRequest(0,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(1,
-                BuilderUtils.newResource(1024, 1), false));
-            add(createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false));
-            add(createStartContainerRequest(3,
-                BuilderUtils.newResource(1024, 1), false));
-          }
-        }
-    ));
-    // All four GUARANTEED containers are all expected to start
-    // because the containers utilization is low (0 at the point)
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(0), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(1), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(2), ContainerSubState.RUNNING);
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(3), ContainerSubState.RUNNING);
-
-    // the containers utilization is at the overallocation threshold
-    setContainerResourceUtilization(
-        ResourceUtilization.newInstance(1536, 0, 1.0f/2));
-
-    // try to start a GUARANTEED container when there's nothing left unallocated
-    containerManager.startContainers(StartContainersRequest.newInstance(
-        Collections.singletonList(
-            createStartContainerRequest(4,
-                BuilderUtils.newResource(1024, 1), true))
-    ));
-
-    BaseContainerManagerTest.waitForContainerSubState(containerManager,
-        createContainerId(4), ContainerSubState.RUNNING);
-    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest.
-        newInstance(new ArrayList<ContainerId>() {
-          {
-            add(createContainerId(0));
-            add(createContainerId(1));
-            add(createContainerId(2));
-            add(createContainerId(3));
-            add(createContainerId(4));
-          }
-        });
-    List<ContainerStatus> containerStatuses = containerManager
-        .getContainerStatuses(statRequest).getContainerStatuses();
-    for (ContainerStatus status : containerStatuses) {
-      if (status.getContainerId().equals(createContainerId(0)) ||
-          status.getContainerId().equals(createContainerId(4))) {
-        Assert.assertEquals(
-            ContainerSubState.RUNNING, status.getContainerSubState());
-      } else {
-        Assert.assertTrue(status.getDiagnostics().contains(
-            "Container Killed to make room for Guaranteed Container"));
-      }
-      System.out.println("\nStatus : [" + status + "]\n");
-    }
-  }
-
-  /**
    * Start two OPPORTUNISTIC containers followed by one GUARANTEED container,
    * which in aggregate exceeds the capacity of the node. The first two
    * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED
@@ -784,11 +699,11 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(0,
-                BuilderUtils.newResource(512, 1), false));
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(512, 1), false));
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(2,
-                BuilderUtils.newResource(1024, 1), true));
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
           }
         }
     ));
@@ -810,9 +725,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(3,
-                BuilderUtils.newResource(512, 1), false));
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(4,
-                BuilderUtils.newResource(800, 1), false));
+                BuilderUtils.newResource(800, 1), ExecutionType.OPPORTUNISTIC));
           }
         }
     ));
@@ -831,8 +746,7 @@ public class TestContainerSchedulerWithOverAllocation
     BaseContainerManagerTest.waitForContainerSubState(containerManager,
         createContainerId(2), ContainerSubState.DONE);
 
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
 
     // only one OPPORTUNISTIC container is start because no over-allocation
     // is allowed to start OPPORTUNISTIC containers at container finish event.
@@ -854,10 +768,8 @@ public class TestContainerSchedulerWithOverAllocation
     // now try to start the OPPORTUNISTIC container that was queued because
     // we don't start OPPORTUNISTIC containers at container finish event if
     // the node would be over-allocated
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .startContainersOutOfBandUponLowUtilization();
-    ((LongRunningContainerSimulatingContainersManager) containerManager)
-        .drainAsyncEvents();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
     BaseContainerManagerTest.waitForContainerSubState(containerManager,
         createContainerId(4), ContainerSubState.RUNNING);
     verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
@@ -884,7 +796,7 @@ public class TestContainerSchedulerWithOverAllocation
     containerManager.startContainers(StartContainersRequest.newInstance(
         Collections.singletonList(
             createStartContainerRequest(0,
-                BuilderUtils.newResource(2048, 4), true))));
+                BuilderUtils.newResource(2048, 4), ExecutionType.GUARANTEED))));
     BaseContainerManagerTest.waitForContainerSubState(containerManager,
         createContainerId(0), ContainerSubState.RUNNING);
 
@@ -897,9 +809,9 @@ public class TestContainerSchedulerWithOverAllocation
         new ArrayList<StartContainerRequest>() {
           {
             add(createStartContainerRequest(1,
-                BuilderUtils.newResource(512, 1), false));
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
             add(createStartContainerRequest(2,
-                BuilderUtils.newResource(512, 1), false));
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
           }
         }
     ));
@@ -913,8 +825,8 @@ public class TestContainerSchedulerWithOverAllocation
         ResourceUtilization.newInstance(1536, 0, 1.0f/2));
 
     // try to start opportunistic containers out of band.
-    ((LongRunningContainerSimulatingContainersManager)containerManager)
-        .startContainersOutOfBandUponLowUtilization();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
 
     // no containers in queue are expected to be launched because the
     // containers utilization is not below the over-allocation threshold
@@ -934,8 +846,8 @@ public class TestContainerSchedulerWithOverAllocation
     setContainerResourceUtilization(
         ResourceUtilization.newInstance(512, 0, 1.0f/8));
 
-    ((LongRunningContainerSimulatingContainersManager)containerManager)
-        .startContainersOutOfBandUponLowUtilization();
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
 
     // the two OPPORTUNISTIC containers are expected to be launched
     BaseContainerManagerTest.waitForContainerSubState(containerManager,
@@ -952,11 +864,332 @@ public class TestContainerSchedulerWithOverAllocation
     });
   }
 
+  /**
+   * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED
+   * container and another OPPORTUNISTIC container in order. When the node
+   * memory utilization is over its preemption threshold, the two OPPORTUNISTIC
+   * containers should be killed.
+   */
+  @Test
+  public void testPreemptOpportunisticContainersUponHighMemoryUtilization()
+      throws Exception {
+    containerManager.start();
+
+    // try to start four containers at once. the first GUARANTEED container
+    // that requests (1024 MB, 1 vcore) can be launched because there is
+    // (2048 MB, 4 vcores) unallocated. The second container, which is
+    // OPPORTUNISTIC, can also be launched because it asks for 512 MB, 1 vcore
+    // which is less than what is left unallocated after launching the first
+    // one GUARANTEED container, (1024 MB, 3 vcores).
+    // The 3rd container, which is GUARANTEED, can also be launched because
+    // the node resource utilization utilization is zero such that
+    // over-allocation kicks in. The 4th one, an OPPORTUNISTIC container,
+    // will be queued because OPPORTUNISTIC containers can only be
+    // launched when node resource utilization is checked, if launching them
+    // would cause node over-allocation.
+    containerManager.startContainers(StartContainersRequest.newInstance(
+        new ArrayList<StartContainerRequest>() {
+          {
+            add(createStartContainerRequest(0,
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
+            add(createStartContainerRequest(1,
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
+            add(createStartContainerRequest(2,
+                BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED));
+            add(createStartContainerRequest(3,
+                BuilderUtils.newResource(300, 1), ExecutionType.OPPORTUNISTIC));
+          }
+        }
+    ));
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
+
+    // the first three containers are all expected to start
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(0), ContainerSubState.RUNNING);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(1), ContainerSubState.RUNNING);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(2), ContainerSubState.RUNNING);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(3), ContainerSubState.SCHEDULED);
+
+    // try to check node resource utilization and start the second
+    // opportunistic containers out of band. Because the node resource
+    // utilization is zero at the moment, over-allocation will kick in
+    // and the container will be launched.
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(3), ContainerSubState.RUNNING);
+
+    // the containers memory utilization is over the preemption threshold
+    // (2048 > 2048 * 0.8 = 1638.4)
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, 0.5f));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // (2048 - 2048 * 0.8) = 409.6 MB of memory needs to be reclaimed,
+    // which shall result in both OPPORTUNISTIC containers to be preempted.
+    // (Preempting the most recently launched OPPORTUNISTIC container, that
+    // is the 4th container, would only release at most 300 MB of memory)
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(1), ContainerSubState.DONE);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(3), ContainerSubState.DONE);
+
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.DONE);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.DONE);
+      }
+    });
+
+  }
+
+  /**
+   * Start a GUARANTEED container followed by an OPPORTUNISTIC container, which
+   * in aggregates does not take more than the capacity of the node.
+   * When the node memory utilization is above the preemption threshold, the
+   * OPPORTUNISTIC container should not be killed because the node is not being
+   * over-allocated.
+   */
+  @Test
+  public void testNoPreemptionUponHighMemoryUtilizationButNoOverallocation()
+      throws Exception {
+    containerManager.start();
+
+    // start two containers, one GUARANTEED and one OPPORTUNISTIC, that together
+    // take up all the allocations (2048 MB of memory and 4 vcores available on
+    // the node). They can be both launched immediately because there are enough
+    // allocations to do so. When the two containers fully utilize their
+    // resource requests, that is, the node is being 100% utilized, the
+    // OPPORTUNISTIC container shall continue to run because the node is
+    // not be over-allocated.
+    containerManager.startContainers(StartContainersRequest.newInstance(
+        new ArrayList<StartContainerRequest>() {
+          {
+            add(createStartContainerRequest(0,
+                BuilderUtils.newResource(1024, 2),
+                ExecutionType.GUARANTEED));
+            add(createStartContainerRequest(1,
+                BuilderUtils.newResource(1024, 2),
+                ExecutionType.OPPORTUNISTIC));
+          }
+        }
+    ));
+    // both containers shall be launched immediately because there are
+    // enough allocations to do so
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(0), ContainerSubState.RUNNING);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(1), ContainerSubState.RUNNING);
+
+    // the node is being fully utilized, which is above the preemption
+    // threshold (2048 * 0.75 = 1536 MB, 1.0f)
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, 1.0f));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // no containers shall be preempted because the node is not being
+    // over-allocated so it is safe to allow the node to be fully utilized
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.RUNNING);
+      }
+    });
+  }
+
+  /**
+   * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED
+   * container and another OPPORTUNISTIC container in order. When the node
+   * cpu utilization is over its preemption threshold a few times in a row,
+   * the two OPPORTUNISTIC containers should be killed one by one.
+   */
+  @Test
+  public void testPreemptionUponHighCPUUtilization() throws Exception {
+    containerManager.start();
+
+    // try to start 4 containers at once. The first container, can be
+    // safely launched immediately (2048 MB, 4 vcores left unallocated).
+    // The second container, can also be launched immediately, because
+    // there is enough resources unallocated after launching the first
+    // container (2048 - 512 = 1536 MB, 4 - 2 = 2 vcores). After launching
+    // the first two containers, there are 1024 MBs of memory and 1 vcore
+    // left unallocated, so there is not enough allocation to launch the
+    // third container. But because the third container is GUARANTEED and
+    // the node resource utilization is zero, we can launch it based on
+    // over-allocation (the projected resource utilization will be 512 MB
+    // of memory and 2 vcores, below the over-allocation threshold)
+    // The fourth container, which is OPPORTUNISTIC, will be queued because
+    // OPPORTUNISTIC containers can not be launched based on over-allocation
+    // upon container start requests (they can only be launched when node
+    // resource utilization is checked in ContainersMonitor)
+    containerManager.startContainers(StartContainersRequest.newInstance(
+        new ArrayList<StartContainerRequest>() {
+          {
+            add(createStartContainerRequest(0,
+                BuilderUtils.newResource(512, 2), ExecutionType.GUARANTEED));
+            add(createStartContainerRequest(1,
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
+            add(createStartContainerRequest(2,
+                BuilderUtils.newResource(512, 2), ExecutionType.GUARANTEED));
+            add(createStartContainerRequest(3,
+                BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC));
+          }
+        }
+    ));
+    ((ContainerManagerForTest) containerManager).drainAsyncEvents();
+    // the first three containers are expected to start. The first two
+    // can be launched based on free allocation, the third can be
+    // launched based on over-allocation
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(0), ContainerSubState.RUNNING);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(1), ContainerSubState.RUNNING);
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(2), ContainerSubState.RUNNING);
+
+    // try to start second opportunistic containers out of band.
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // the second opportunistic container is expected to start because
+    // the node resource utilization is at zero, the projected utilization
+    // is 512 MBs of memory and 1 vcore
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(3), ContainerSubState.RUNNING);
+
+    final float fullCpuUtilization = 1.0f;
+
+    // the containers CPU utilization is over its preemption threshold (0.8f)
+    // for the first time
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, fullCpuUtilization));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // all containers should continue to be running because we don't
+    // preempt OPPORTUNISTIC containers right away
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.RUNNING);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.RUNNING);
+      }
+    });
+
+    // the containers CPU utilization is over its preemption threshold (0.8f)
+    // for the second time
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, fullCpuUtilization));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // all containers should continue to be running because we don't preempt
+    // OPPORTUNISTIC containers when the cpu is over the preemption threshold
+    // (0.8f) the second time
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.RUNNING);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.RUNNING);
+      }
+    });
+
+    // the containers CPU utilization is over the preemption threshold (0.8f)
+    // for the third time
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, fullCpuUtilization));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // because CPU utilization is over its preemption threshold three times
+    // consecutively, the amount of cpu utilization over the preemption
+    // threshold, that is, 1.0 - 0.8 = 0.2f CPU needs to be reclaimed and
+    // as a result, the most recently launched OPPORTUNISTIC container should
+    // be killed
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(3), ContainerSubState.DONE);
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.RUNNING);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.DONE);
+      }
+    });
+
+    // again, the containers CPU utilization is over the preemption threshold
+    // (0.8f) for the first time (the cpu over-limit count is reset every time
+    // a preemption is triggered)
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, fullCpuUtilization));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // no CPU resource is expected to be reclaimed when the CPU utilization
+    // goes over the preemption threshold the first time
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.RUNNING);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.DONE);
+      }
+    });
+
+    // the containers CPU utilization is over the preemption threshold (0.9f)
+    // for the second time
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, fullCpuUtilization));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // still no CPU resource is expected to be reclaimed when the CPU
+    // utilization goes over the preemption threshold the second time
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.RUNNING);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.DONE);
+      }
+    });
+
+    // the containers CPU utilization is over the preemption threshold
+    // for the third time
+    setContainerResourceUtilization(
+        ResourceUtilization.newInstance(2048, 0, fullCpuUtilization));
+    ((ContainerManagerForTest) containerManager)
+        .checkNodeResourceUtilization();
+
+    // because CPU utilization is over its preemption threshold three times
+    // consecutively, the amount of cpu utilization over the preemption
+    // threshold, that is, 1.0 - 0.8 = 0.2f CPU needs to be reclaimed and
+    // as a result, the other OPPORTUNISTIC container should be killed
+    BaseContainerManagerTest.waitForContainerSubState(containerManager,
+        createContainerId(1), ContainerSubState.DONE);
+    verifyContainerStatuses(new HashMap<ContainerId, ContainerSubState>() {
+      {
+        put(createContainerId(0), ContainerSubState.RUNNING);
+        put(createContainerId(1), ContainerSubState.DONE);
+        put(createContainerId(2), ContainerSubState.RUNNING);
+        put(createContainerId(3), ContainerSubState.DONE);
+      }
+    });
+  }
+
 
   private void setContainerResourceUtilization(ResourceUtilization usage) {
-    ((ContainerMonitorForOverallocationTest)
-        containerManager.getContainersMonitor())
-            .setContainerResourceUsage(usage);
+    ((ContainerMonitorForTest) containerManager.getContainersMonitor())
+        .setContainerResourceUsage(usage);
   }
 
   private void allowContainerToSucceed(int containerId) {
@@ -965,12 +1198,11 @@ public class TestContainerSchedulerWithOverAllocation
   }
 
 
-  protected StartContainerRequest createStartContainerRequest(int containerId,
-      Resource resource, boolean isGuaranteed) throws IOException {
+  protected StartContainerRequest createStartContainerRequest(
+      int containerId, Resource resource, ExecutionType executionType)
+      throws IOException {
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    ExecutionType executionType = isGuaranteed ? ExecutionType.GUARANTEED :
-        ExecutionType.OPPORTUNISTIC;
     Token containerToken = createContainerToken(
         createContainerId(containerId),
         DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource,
@@ -1004,9 +1236,7 @@ public class TestContainerSchedulerWithOverAllocation
    * container processes for testing purposes.
    */
   private static class LongRunningContainerSimulatingContainersManager
-      extends ContainerManagerImpl {
-
-    private final String user;
+      extends ContainerManagerForTest {
 
     LongRunningContainerSimulatingContainersManager(
         Context context, ContainerExecutor exec,
@@ -1014,27 +1244,8 @@ public class TestContainerSchedulerWithOverAllocation
         NodeStatusUpdater nodeStatusUpdater,
         NodeManagerMetrics metrics,
         LocalDirsHandlerService dirsHandler, String user) {
-      super(context, exec, deletionContext,
-          nodeStatusUpdater, metrics, dirsHandler);
-      this.user = user;
-    }
-
-    @Override
-    protected UserGroupInformation getRemoteUgi() throws YarnException {
-      ApplicationId appId = ApplicationId.newInstance(0, 0);
-      ApplicationAttemptId appAttemptId =
-          ApplicationAttemptId.newInstance(appId, 1);
-      UserGroupInformation ugi =
-          UserGroupInformation.createRemoteUser(appAttemptId.toString());
-      ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
-          .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
-          .getKeyId()));
-      return ugi;
-    }
-
-    @Override
-    protected AsyncDispatcher createDispatcher() {
-      return new DrainDispatcher();
+        super(context, exec, deletionContext,
+          nodeStatusUpdater, metrics, dirsHandler, user);
     }
 
     /**
@@ -1060,27 +1271,10 @@ public class TestContainerSchedulerWithOverAllocation
                 throws Exception {
               return "123";
             }
-
           };
         }
       };
     }
-
-    @Override
-    protected ContainersMonitor createContainersMonitor(
-        ContainerExecutor exec) {
-      return new ContainerMonitorForOverallocationTest(exec,
-          dispatcher, context);
-    }
-
-    public void startContainersOutOfBandUponLowUtilization() {
-      ((ContainerMonitorForOverallocationTest) getContainersMonitor())
-          .attemptToStartContainersUponLowUtilization();
-    }
-
-    public void drainAsyncEvents() {
-      ((DrainDispatcher) dispatcher).await();
-    }
   }
 
   /**
@@ -1176,56 +1370,4 @@ public class TestContainerSchedulerWithOverAllocation
       }
     }
   }
-
-  /**
-   * A test implementation of container monitor that allows control of
-   * current resource utilization.
-   */
-  private static class ContainerMonitorForOverallocationTest
-      extends ContainersMonitorImpl {
-
-    private ResourceUtilization containerResourceUsage =
-        ResourceUtilization.newInstance(0, 0, 0.0f);
-
-    ContainerMonitorForOverallocationTest(ContainerExecutor exec,
-        AsyncDispatcher dispatcher, Context context) {
-      super(exec, dispatcher, context);
-    }
-
-    @Override
-    public long getPmemAllocatedForContainers() {
-      return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L;
-    }
-
-    @Override
-    public long getVmemAllocatedForContainers() {
-      float pmemRatio = getConfig().getFloat(
-          YarnConfiguration.NM_VMEM_PMEM_RATIO,
-          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-      return (long) (pmemRatio * getPmemAllocatedForContainers());
-    }
-
-    @Override
-    public long getVCoresAllocatedForContainers() {
-      return NM_CONTAINERS_VCORES;
-    }
-
-    @Override
-    public ContainersResourceUtilization getContainersUtilization(
-        boolean latest) {
-      return new ContainersMonitor.ContainersResourceUtilization(
-          containerResourceUsage, System.currentTimeMillis());
-    }
-
-    @Override
-    protected void checkOverAllocationPrerequisites() {
-      // do not check
-    }
-
-
-    public void setContainerResourceUsage(
-        ResourceUtilization containerResourceUsage) {
-      this.containerResourceUsage = containerResourceUsage;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/124b378f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java
new file mode 100644
index 0000000..bbc7c49
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link SnapshotBasedOverAllocationPreemptionPolicy}.
+ */
+public class TestSnapshotBasedOverAllocationPreemptionPolicy {
+  // Both the CPU preemption threshold and the memory preemption threshold
+  // are 75%
+  private final static ResourceThresholds PREEMPTION_THRESHOLDS =
+      ResourceThresholds.newInstance(0.75f, 0.75f);
+
+  // The CPU utilization is allowed to go over the cpu preemption threshold
+  // 2 times in a row before any container is preempted to reclaim cpu resources
+  private final static int MAX_CPU_OVER_PREEMPTION_THRESHOLDS = 2;
+
+  private final ContainersMonitor containersMonitor =
+      mock(ContainersMonitor.class);
+
+  @Before
+  public void setUp() {
+    // the node has an allocation of 2048 MB of memory
+    when(containersMonitor.getPmemAllocatedForContainers()).
+        thenReturn(2048 * 1024 * 1024L);
+  }
+
+  /**
+   * The memory utilization goes above its preemption threshold,
+   * 2048  * 0.75f = 1536 MB (the node has an allocation of 2048 MB memory).
+   */
+  @Test
+  public void testMemoryOverPreemptionThreshold() {
+    SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy =
+        new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS,
+            MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor);
+
+    // the current memory utilization, 2000 MB is over the preemption
+    // threshold, 2048 * 0.75, which is 1536 MB. The CPU utilization,
+    // 0.5f is below the preemption threshold, 0.75f.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(2000, 0, 0.5f),
+            Time.now()));
+
+    // the amount of memory utilization over the preemption threshold, that is,
+    // 2000 - (2048 * 0.75) = 464 MB of memory, shall be reclaimed.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(464, 0, 0f),
+        preemptionPolicy.getResourcesToReclaim());
+  }
+
+  /**
+   * The CPU utilization goes above its preemption threshold, 0.75f.
+   */
+  @Test
+  public void testCpuOverPreemptionThreshold() {
+    SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy =
+        new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS,
+            MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor);
+
+    // the current CPU utilization, 1.0f, is over the preemption threshold,
+    // 0.75f, for the first time. The memory utilization, 1000 MB is below
+    // the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // no resources shall be reclaimed
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 0.5f, is below the preemption threshold,
+    // 0.75f. In the meantime the memory utilization, 1000 MB is also below
+    // the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 0.5f),
+            Time.now()));
+    // no resources shall be reclaimed
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 1.0f, is over the preemption threshold,
+    // 0.75f. In the meantime the memory utilization, 1000 MB is below
+    // the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // no resources shall be reclaimed because the cpu utilization is allowed
+    // to go over the preemption threshold at most two times in a row. It is
+    // just over the preemption threshold for the first time
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 1.0f, is again over the preemption
+    // threshold, 0.75f. In the meantime the memory utilization, 1000 MB
+    // is below the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // no resources shall be reclaimed because the cpu utilization is allowed
+    // to go over the preemption threshold at most two times in a row. It is
+    // just over the preemption threshold for the second time in a row
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 1.0f, is over the preemption threshold,
+    // the third time in a row. In the meantime the memory utilization, 1000 MB
+    // is below the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // the amount of cpu utilization over the preemption threshold, that is,
+    // 1.0 - 0.75f = 0.25, shall be reclaimed.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.25f),
+        preemptionPolicy.getResourcesToReclaim());
+  }
+
+  /**
+   * Both memory and CPU utilization go over their preemption thresholds
+   * respectively.
+   */
+  @Test
+  public void testMemoryCpuOverPreemptionThreshold() {
+    SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy =
+        new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS,
+            MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor);
+
+    // the current CPU utilization, 1.0f, is over the preemption threshold,
+    // 0.75f, for the first time. The memory utilization, 1000 MB is below
+    // the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // no resources shall be reclaimed because the cpu utilization is allowed
+    // to go over the preemption threshold at most two times in a row. It is
+    // just over the preemption threshold for the first time.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 0.5f, is below the preemption threshold,
+    // 0.75f. The memory utilization, 2000 MB, however, is above the memory
+    // preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(2000, 0, 0.5f),
+            Time.now()));
+    // the amount of memory utilization over the preemption threshold, that is,
+    // 2000 - (2048 * 0.75) = 464 MB of memory, shall be reclaimed.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(464, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 1.0f, is over the preemption threshold,
+    // 0.75f, for the first time. The memory utilization, 1000 MB is below
+    // the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // no resources shall be reclaimed because the cpu utilization is allowed
+    // to go over the preemption threshold at most two times in a row. It is
+    // just over the preemption threshold for the first time.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 1.0f, is again over the preemption
+    // threshold, 0.75f. In the meantime the memory utilization, 1000 MB
+    // is still below the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 1.0f),
+            Time.now()));
+    // no resources shall be reclaimed because the cpu utilization is allowed
+    // to go over the preemption threshold at most two times in a row. It is
+    // just over the preemption threshold for the second time in a row.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0.0f),
+        preemptionPolicy.getResourcesToReclaim());
+
+    // the current CPU utilization, 1.0f, is over the CPU preemption threshold,
+    // 0.75f, the third time in a row. In the meantime, the memory utilization,
+    // 2000 MB, is also over the memory preemption threshold,
+    // 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(2000, 0, 1.0f),
+            Time.now()));
+    // the amount of memory utilization over the preemption threshold, that is,
+    // 2000 - (2048 * 0.75) = 464 MB of memory, and the amount of cpu
+    // utilization over the preemption threshold, that is, 1.0f - 0.75f = 0.25f,
+    // shall be reclaimed.
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(464, 0, 0.25f),
+        preemptionPolicy.getResourcesToReclaim());
+  }
+
+  /**
+   * Both memory and CPU utilization are under their preemption thresholds.
+   */
+  @Test
+  public void testBothMemoryAndCpuUnderPreemptionThreshold() {
+    SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy =
+        new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS,
+            MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor);
+
+    // the current CPU utilization, 0.5f, is below the preemption threshold,
+    // 0.75f. In the meantime the memory utilization, 1000 MB is also below
+    // the memory preemption threshold, 2048 * 0.75 = 1536 MB.
+    when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn(
+        new ContainersMonitor.ContainersResourceUtilization(
+            ResourceUtilization.newInstance(1000, 0, 0.5f),
+            Time.now()));
+    // no resources shall be reclaimed because both CPU and memory utilization
+    // are under the preemption threshold
+    Assert.assertEquals(
+        ResourceUtilization.newInstance(0, 0, 0f),
+        preemptionPolicy.getResourcesToReclaim());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org