You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/04/09 18:00:18 UTC

[hadoop] branch trunk updated: YARN-999. In case of long running tasks, reduce node resource should balloon out resource quickly by calling preemption API and suspending running task. Contributed by Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfec455  YARN-999. In case of long running tasks, reduce node resource should balloon out resource quickly by calling preemption API and suspending running task. Contributed by Inigo Goiri.
cfec455 is described below

commit cfec455c452d85229ef2f9d83e6f7fc827946b59
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Tue Apr 9 10:59:43 2019 -0700

    YARN-999. In case of long running tasks, reduce node resource should balloon out resource quickly by calling preemption API and suspending running task. Contributed by Inigo Goiri.
---
 .../hadoop/yarn/api/records/ResourceOption.java    |   8 +-
 .../resourcemanager/ResourceTrackerService.java    |   5 +
 .../yarn/server/resourcemanager/rmnode/RMNode.java |  11 +
 .../server/resourcemanager/rmnode/RMNodeImpl.java  |  22 +-
 .../scheduler/AbstractYarnScheduler.java           |  77 ++-
 .../resourcemanager/scheduler/SchedulerNode.java   |  66 ++
 .../scheduler/fair/FairScheduler.java              |  27 +-
 .../yarn/server/resourcemanager/MockNodes.java     |   9 +
 .../scheduler/TestAbstractYarnScheduler.java       |  93 +++
 .../scheduler/TestSchedulerOvercommit.java         | 723 +++++++++++++++++++++
 .../scheduler/capacity/TestCapacityScheduler.java  | 205 +++---
 .../capacity/TestCapacitySchedulerOvercommit.java  |  52 ++
 .../fair/TestFairSchedulerOvercommit.java          |  46 ++
 .../hadoop-metrics2-resourcemanager.properties     |  23 +
 .../src/test/resources/hadoop-metrics2.properties  |  23 +
 15 files changed, 1295 insertions(+), 95 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
index e9de052..86261de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
@@ -55,12 +55,16 @@ public abstract class ResourceOption {
    * Get timeout for tolerant of resource over-commitment
    * Note: negative value means no timeout so that allocated containers will
    * keep running until the end even under resource over-commitment cases.
-   * @return <em>overCommitTimeout</em> of the ResourceOption
+   * @return <em>overCommitTimeout</em> of the ResourceOption in milliseconds.
    */
   @Private
   @Evolving
   public abstract int getOverCommitTimeout();
-  
+
+  /**
+   * Set the overcommit timeout.
+   * @param overCommitTimeout Timeout in ms. Negative means no timeout.
+   */
   @Private
   @Evolving
   protected abstract void setOverCommitTimeout(int overCommitTimeout);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f021ebb..012f58a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -675,6 +675,11 @@ public class ResourceTrackerService extends AbstractService implements
     if (capability != null) {
       nodeHeartBeatResponse.setResource(capability);
     }
+    // Check if we got an event (AdminService) that updated the resources
+    if (rmNode.isUpdatedCapability()) {
+      nodeHeartBeatResponse.setResource(rmNode.getTotalCapability());
+      rmNode.resetUpdatedCapability();
+    }
 
     // 7. Send Container Queuing Limits back to the Node. This will be used by
     // the node to truncate the number of Containers queued for execution.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index c77d29c..d3b515e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -105,6 +105,17 @@ public interface RMNode {
   public Resource getTotalCapability();
 
   /**
+   * If the total available resources has been updated.
+   * @return If the capability has been updated.
+   */
+  boolean isUpdatedCapability();
+
+  /**
+   * Mark that the updated event has been processed.
+   */
+  void resetUpdatedCapability();
+
+  /**
    * the aggregated resource utilization of the containers.
    * @return the aggregated resource utilization of the containers.
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 37f3a37..e94dfe0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -126,6 +126,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* Snapshot of total resources before receiving decommissioning command */
   private volatile Resource originalTotalCapability;
   private volatile Resource totalCapability;
+  private volatile boolean updatedCapability = false;
   private final Node node;
 
   private String healthReport;
@@ -457,6 +458,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   @Override
+  public boolean isUpdatedCapability() {
+    return this.updatedCapability;
+  }
+
+  @Override
+  public void resetUpdatedCapability() {
+    this.updatedCapability = false;
+  }
+
+  @Override
   public String getRackName() {
     return node.getNetworkLocation();
   }
@@ -814,11 +825,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
   }
   
-  private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, 
-     RMNodeResourceUpdateEvent event){
-      ResourceOption resourceOption = event.getResourceOption();
-      // Set resource on RMNode
-      rmNode.totalCapability = resourceOption.getResource();
+  private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
+      RMNodeResourceUpdateEvent event){
+    ResourceOption resourceOption = event.getResourceOption();
+    // Set resource on RMNode
+    rmNode.totalCapability = resourceOption.getResource();
+    rmNode.updatedCapability = true;
   }
 
   private static NodeHealthStatus updateRMNodeFromStatusEvents(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 5168b34..5fd064b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -92,13 +92,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -116,6 +119,8 @@ public abstract class AbstractYarnScheduler
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractYarnScheduler.class);
 
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
   protected final ClusterNodeTracker<N> nodeTracker =
       new ClusterNodeTracker<>();
 
@@ -809,6 +814,7 @@ public abstract class AbstractYarnScheduler
     try {
       SchedulerNode node = getSchedulerNode(nm.getNodeID());
       Resource newResource = resourceOption.getResource();
+      final int timeout = resourceOption.getOverCommitTimeout();
       Resource oldResource = node.getTotalResource();
       if (!oldResource.equals(newResource)) {
         // Notify NodeLabelsManager about this change
@@ -816,13 +822,15 @@ public abstract class AbstractYarnScheduler
             newResource);
 
         // Log resource change
-        LOG.info("Update resource on node: " + node.getNodeName() + " from: "
-            + oldResource + ", to: " + newResource);
+        LOG.info("Update resource on node: {} from: {}, to: {} in {} ms",
+            node.getNodeName(), oldResource, newResource, timeout);
 
         nodeTracker.removeNode(nm.getNodeID());
 
         // update resource to node
         node.updateTotalResource(newResource);
+        node.setOvercommitTimeOut(timeout);
+        signalContainersIfOvercommitted(node, timeout == 0);
 
         nodeTracker.addNode((N) node);
       } else{
@@ -1165,6 +1173,10 @@ public abstract class AbstractYarnScheduler
       updateNodeResourceUtilization(nm, schedulerNode);
     }
 
+    if (schedulerNode != null) {
+      signalContainersIfOvercommitted(schedulerNode, true);
+    }
+
     // Now node data structures are up-to-date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
       LOG.debug(
@@ -1174,6 +1186,67 @@ public abstract class AbstractYarnScheduler
     }
   }
 
+  /**
+   * Check if the node is overcommitted and needs to remove containers. If
+   * it is overcommitted, it will kill or preempt (notify the AM to stop them)
+   * containers. It also takes into account the overcommit timeout. It only
+   * notifies the application to preempt a container if the timeout hasn't
+   * passed. If the timeout has passed, it tries to kill the containers. If
+   * there is no timeout, it doesn't do anything and just prevents new
+   * allocations.
+   *
+   * This action is taken when the change of resources happens (to preempt
+   * containers or killing them if specified) or when the node heart beats
+   * (for killing only).
+   *
+   * @param schedulerNode The node to check whether is overcommitted.
+   * @param kill If the container should be killed or just notify the AM.
+   */
+  private void signalContainersIfOvercommitted(
+      SchedulerNode schedulerNode, boolean kill) {
+
+    // If there is no time out, we don't do anything
+    if (!schedulerNode.isOvercommitTimeOutSet()) {
+      return;
+    }
+
+    SchedulerEventType eventType =
+        SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
+    if (kill) {
+      eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+
+      // If it hasn't timed out yet, don't kill
+      if (!schedulerNode.isOvercommitTimedOut()) {
+        return;
+      }
+    }
+
+    // Check if the node is overcommitted (negative resources)
+    ResourceCalculator rc = getResourceCalculator();
+    Resource unallocated = Resource.newInstance(
+        schedulerNode.getUnallocatedResource());
+    if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+      return;
+    }
+
+    LOG.info("{} is overcommitted ({}), preempt/kill containers",
+        schedulerNode.getNodeID(), unallocated);
+    for (RMContainer container : schedulerNode.getContainersToKill()) {
+      LOG.info("Send {} to {} to free up {}", eventType,
+          container.getContainerId(), container.getAllocatedResource());
+      ApplicationAttemptId appId = container.getApplicationAttemptId();
+      ContainerPreemptEvent event =
+          new ContainerPreemptEvent(appId, container, eventType);
+      this.rmContext.getDispatcher().getEventHandler().handle(event);
+      Resources.addTo(unallocated, container.getAllocatedResource());
+
+      if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+        LOG.debug("Enough unallocated resources {}", unallocated);
+        break;
+      }
+    }
+  }
+
   @Override
   public Resource getNormalizedResource(Resource requestedResource,
                                         Resource maxResourceCapability) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index e36bc64..ef03aad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -70,6 +72,8 @@ public abstract class SchedulerNode {
       ResourceUtilization.newInstance(0, 0, 0f);
   private volatile ResourceUtilization nodeUtilization =
       ResourceUtilization.newInstance(0, 0, 0f);
+  /** Time stamp for overcommitted resources to time out. */
+  private long overcommitTimeout = -1;
 
   /* set of containers that are allocated containers */
   private final Map<ContainerId, ContainerInfo> launchedContainers =
@@ -120,6 +124,38 @@ public abstract class SchedulerNode {
   }
 
   /**
+   * Set the timeout for the node to stop overcommitting the resources. After
+   * this time the scheduler will start killing containers until the resources
+   * are not overcommitted anymore. This may reset a previous timeout.
+   * @param timeOut Time out in milliseconds.
+   */
+  public synchronized void setOvercommitTimeOut(long timeOut) {
+    if (timeOut >= 0) {
+      if (this.overcommitTimeout != -1) {
+        LOG.debug("The overcommit timeout for {} was already set to {}",
+            getNodeID(), this.overcommitTimeout);
+      }
+      this.overcommitTimeout = Time.now() + timeOut;
+    }
+  }
+
+  /**
+   * Check if the time out has passed.
+   * @return If the node is overcommitted.
+   */
+  public synchronized boolean isOvercommitTimedOut() {
+    return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout;
+  }
+
+  /**
+   * Check if the node has a time out for overcommit resources.
+   * @return If the node has a time out for overcommit resources.
+   */
+  public synchronized boolean isOvercommitTimeOutSet() {
+    return this.overcommitTimeout >= 0;
+  }
+
+  /**
    * Get the ID of the node which contains both its hostname and port.
    * @return The ID of the node.
    */
@@ -373,6 +409,36 @@ public abstract class SchedulerNode {
   }
 
   /**
+   * Get the containers running on the node ordered by which to kill first. It
+   * tries to kill AMs last, then GUARANTEED containers, and it kills
+   * OPPORTUNISTIC first. If the same time, it uses the creation time.
+   * @return A copy of the running containers ordered by which to kill first.
+   */
+  public List<RMContainer> getContainersToKill() {
+    List<RMContainer> result = getLaunchedContainers();
+    Collections.sort(result, (c1, c2) -> {
+      return new CompareToBuilder()
+          .append(c1.isAMContainer(), c2.isAMContainer())
+          .append(c2.getExecutionType(), c1.getExecutionType()) // reversed
+          .append(c2.getCreationTime(), c1.getCreationTime()) // reversed
+          .toComparison();
+    });
+    return result;
+  }
+
+  /**
+   * Get the launched containers in the node.
+   * @return List of launched containers.
+   */
+  protected synchronized List<RMContainer> getLaunchedContainers() {
+    List<RMContainer> result = new ArrayList<>();
+    for (ContainerInfo info : launchedContainers.values()) {
+      result.add(info.container);
+    }
+    return result;
+  }
+
+  /**
    * Get the container for the specified container ID.
    * @param containerId The container ID
    * @return The container for the specified container ID
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 166bb48..151a7ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1288,8 +1289,32 @@ public class FairScheduler extends
               SchedulerUtils.EXPIRED_CONTAINER),
           RMContainerEventType.EXPIRE);
       break;
+    case MARK_CONTAINER_FOR_PREEMPTION:
+      if (!(event instanceof ContainerPreemptEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      ContainerPreemptEvent preemptContainerEvent =
+          (ContainerPreemptEvent)event;
+      ApplicationAttemptId appId = preemptContainerEvent.getAppId();
+      RMContainer preemptedContainer = preemptContainerEvent.getContainer();
+      FSAppAttempt app = getApplicationAttempt(appId);
+      app.trackContainerForPreemption(preemptedContainer);
+      break;
+    case MARK_CONTAINER_FOR_KILLABLE:
+      if (!(event instanceof ContainerPreemptEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      ContainerPreemptEvent containerKillableEvent =
+          (ContainerPreemptEvent)event;
+      RMContainer killableContainer = containerKillableEvent.getContainer();
+      completedContainer(killableContainer,
+          SchedulerUtils.createPreemptedContainerStatus(
+              killableContainer.getContainerId(),
+              SchedulerUtils.PREEMPTED_CONTAINER),
+          RMContainerEventType.KILL);
+      break;
     default:
-      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
+      LOG.error("Unknown event arrived at FairScheduler: {}", event);
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index c0af041..3b72ca1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -191,6 +191,15 @@ public class MockNodes {
     }
 
     @Override
+    public boolean isUpdatedCapability() {
+      return false;
+    }
+
+    @Override
+    public void resetUpdatedCapability() {
+    }
+
+    @Override
     public String getRackName() {
       return this.rackName;
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index ba409b1..b58c7a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +37,7 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.*;
@@ -1018,4 +1021,94 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
       System.out.println("Stopping testContainerRecoveredByNode");
     }
   }
+
+  /**
+   * Test the order we get the containers to kill. It should respect the order
+   * described in {@link SchedulerNode#getContainersToKill()}.
+   */
+  @Test
+  public void testGetRunningContainersToKill() {
+    final SchedulerNode node = new MockSchedulerNode();
+    assertEquals(Collections.emptyList(), node.getContainersToKill());
+
+    // AM0
+    RMContainer am0 = newMockRMContainer(
+        true, ExecutionType.GUARANTEED, "AM0");
+    node.allocateContainer(am0);
+    assertEquals(Arrays.asList(am0), node.getContainersToKill());
+
+    // OPPORTUNISTIC0, AM0
+    RMContainer opp0 = newMockRMContainer(
+        false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0");
+    node.allocateContainer(opp0);
+    assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill());
+
+    // OPPORTUNISTIC0, GUARANTEED0, AM0
+    RMContainer regular0 = newMockRMContainer(
+        false, ExecutionType.GUARANTEED, "GUARANTEED0");
+    node.allocateContainer(regular0);
+    assertEquals(Arrays.asList(opp0, regular0, am0),
+        node.getContainersToKill());
+
+    // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0
+    RMContainer opp1 = newMockRMContainer(
+        false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1");
+    node.allocateContainer(opp1);
+    assertEquals(Arrays.asList(opp1, opp0, regular0, am0),
+        node.getContainersToKill());
+
+    // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0
+    RMContainer am1 = newMockRMContainer(
+        true, ExecutionType.GUARANTEED, "AM1");
+    node.allocateContainer(am1);
+    assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0),
+        node.getContainersToKill());
+
+    // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0
+    RMContainer regular1 = newMockRMContainer(
+        false, ExecutionType.GUARANTEED, "GUARANTEED1");
+    node.allocateContainer(regular1);
+    assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0),
+        node.getContainersToKill());
+  }
+
+  private static RMContainer newMockRMContainer(boolean isAMContainer,
+      ExecutionType executionType, String name) {
+    RMContainer container = mock(RMContainer.class);
+    when(container.isAMContainer()).thenReturn(isAMContainer);
+    when(container.getExecutionType()).thenReturn(executionType);
+    when(container.getCreationTime()).thenReturn(Time.now());
+    when(container.toString()).thenReturn(name);
+    return container;
+  }
+
+  /**
+   * SchedulerNode mock to test launching containers.
+   */
+  class MockSchedulerNode extends SchedulerNode {
+    private final List<RMContainer> containers = new ArrayList<>();
+
+    MockSchedulerNode() {
+      super(MockNodes.newNodeInfo(0, Resource.newInstance(1, 1)), false);
+    }
+
+    @Override
+    protected List<RMContainer> getLaunchedContainers() {
+      return containers;
+    }
+
+    @Override
+    public void allocateContainer(RMContainer rmContainer) {
+      containers.add(rmContainer);
+      // Shuffle for testing
+      Collections.shuffle(containers);
+    }
+
+    @Override
+    public void reserveResource(SchedulerApplicationAttempt attempt,
+        SchedulerRequestKey schedulerKey, RMContainer container) {}
+
+    @Override
+    public void unreserveResource(SchedulerApplicationAttempt attempt) {}
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java
new file mode 100644
index 0000000..cc665fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java
@@ -0,0 +1,723 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestResourceTrackerService.NullNodeAttributeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generic tests for overcommitting resources. This needs to be instantiated
+ * with a scheduler ({@link YarnConfiguration.RM_SCHEDULER}).
+ *
+ * If reducing the amount of resources leads to overcommitting (negative
+ * available resources), the scheduler will select containers to make room.
+ * <ul>
+ * <li>If there is no timeout (&lt;0), it doesn't kill or preempt surplus
+ * containers.</li>
+ * <li>If the timeout is 0, it kills the surplus containers immediately.</li>
+ * <li>If the timeout is larger than 0, it first asks the application to
+ * preempt those containers and after the timeout passes, it kills the surplus
+ * containers.</li>
+ * </ul>
+ */
+public abstract class TestSchedulerOvercommit {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSchedulerOvercommit.class);
+
+  /** 1 GB in MB. */
+  protected final static int GB = 1024;
+
+  /** We do scheduling and heart beat every 200ms. */
+  protected static final int INTERVAL = 200;
+
+
+  /** Mock Resource Manager. */
+  private MockRM rm;
+  /** Scheduler for the Mock Resource Manager.*/
+  private ResourceScheduler scheduler;
+
+  /** Node Manager running containers. */
+  private MockNM nm;
+  private NodeId nmId;
+
+  /** Application to allocate containers. */
+  private RMAppAttempt attempt;
+  private MockAM am;
+
+  /**
+   * Setup the cluster with: an RM, a NM and an application for test.
+   * @throws Exception If it cannot set up the cluster.
+   */
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Setting up the test cluster...");
+
+    // Start the Resource Manager
+    Configuration conf = getConfiguration();
+    rm = new MockRM(conf);
+    rm.start();
+    scheduler = rm.getResourceScheduler();
+
+    // Add a Node Manager with 4GB
+    nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+    nmId = nm.getNodeId();
+
+    // Start an AM with 2GB
+    RMApp app = rm.submitApp(2 * GB);
+    nm.nodeHeartbeat(true);
+    attempt = app.getCurrentAppAttempt();
+    am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+
+    // After allocation, used 2GB and remaining 2GB on the NM
+    assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
+    nm.nodeHeartbeat(true);
+  }
+
+  /**
+   * Get the configuration for the scheduler. This is used when setting up the
+   * Resource Manager and should setup the scheduler (e.g., Capacity Scheduler
+   * or Fair Scheduler). It needs to set the configuration with
+   * {@link YarnConfiguration.RM_SCHEDULER}.
+   * @return Configuration for the scheduler.
+   */
+  protected Configuration getConfiguration() {
+    Configuration conf = new YarnConfiguration();
+
+    // Prevent loading node attributes
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        NullNodeAttributeStore.class, NodeAttributeStore.class);
+
+    return conf;
+  }
+
+  /**
+   * Stops the default application and the RM (with the scheduler).
+   * @throws Exception If it cannot stop the cluster.
+   */
+  @After
+  public void cleanup() throws Exception {
+    LOG.info("Cleaning up the test cluster...");
+
+    if (am != null) {
+      am.unregisterAppAttempt();
+      am = null;
+    }
+    if (rm != null) {
+      rm.drainEvents();
+      rm.stop();
+      rm = null;
+    }
+  }
+
+
+  /**
+   * Reducing the resources with no timeout should prevent new containers
+   * but wait for the current ones without killing.
+   */
+  @Test
+  public void testReduceNoTimeout() throws Exception {
+
+    // New 2GB container should give 4 GB used (2+2) and 0 GB available
+    Container c1 = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // Update node resource to 2 GB, so resource is over-consumed
+    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+    // The used resource should still be 4 GB and negative available resource
+    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, INTERVAL, 2 * 1000);
+    // Check that the NM got the updated resources
+    nm.nodeHeartbeat(true);
+    assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+    // Check that we did not get a preemption request
+    assertNoPreemption(am.schedule().getPreemptionMessage());
+
+    // Check container can complete successfully with resource over-commitment
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
+    nm.containerStatus(containerStatus);
+
+    LOG.info("Waiting for container to be finished for app...");
+    GenericTestUtils.waitFor(
+        () -> attempt.getJustFinishedContainers().size() == 1,
+        INTERVAL, 2 * 1000);
+    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+    assertMemory(scheduler, nmId, 2 * GB, 0);
+
+    // Verify no NPE is trigger in schedule after resource is updated
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+    AllocateResponse allocResponse2 = am.schedule();
+    assertTrue("Shouldn't have enough resource to allocate containers",
+        allocResponse2.getAllocatedContainers().isEmpty());
+    // Try 10 times as scheduling is an async process
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(INTERVAL);
+      allocResponse2 = am.schedule();
+      assertTrue("Shouldn't have enough resource to allocate containers",
+          allocResponse2.getAllocatedContainers().isEmpty());
+    }
+  }
+
+  /**
+   * Changing resources multiples times without waiting for the
+   * timeout.
+   */
+  @Test
+  public void testChangeResourcesNoTimeout() throws Exception {
+    waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000);
+
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 2 * 1000);
+
+    updateNodeResource(rm, nmId, 0 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, -2 * GB, 100, 2 * 1000);
+
+    updateNodeResource(rm, nmId, 4 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000);
+
+    // The application should still be running without issues.
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+  }
+
+  /**
+   * Reducing the resources with 0 time out kills the container right away.
+   */
+  @Test
+  public void testReduceKill() throws Exception {
+
+    Container container = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // Reducing to 2GB should kill the container
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 2 * GB, 2, 0);
+    waitMemory(scheduler, nm, 2 * GB, 0 * GB, INTERVAL, 2 * INTERVAL);
+
+    // Check that the new container was killed
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus containerStatus = completedContainers.get(0);
+    assertContainerKilled(container.getId(), containerStatus);
+
+    // It should kill the containers right away
+    assertTime(0, Time.now() - t0);
+  }
+
+  /**
+   * Reducing the resources with a time out should first preempt and then kill.
+   */
+  @Test
+  public void testReducePreemptAndKill() throws Exception {
+
+    Container container = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // We give an overcommit time out of 2 seconds
+    final int timeout = (int)TimeUnit.SECONDS.toMillis(2);
+
+    // Reducing to 2GB should first preempt the container
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 2 * GB, 2, timeout);
+    waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout);
+
+    // We should receive a notification to preempt the container
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(container.getId(), preemptMsg);
+
+    // Wait until the container is killed
+    waitMemory(scheduler, nm, 2 * GB, 0, INTERVAL, timeout + 2 * INTERVAL);
+
+    // Check that the container was killed
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus containerStatus = completedContainers.get(0);
+    assertContainerKilled(container.getId(), containerStatus);
+
+    // Check how long it took to kill the container
+    assertTime(timeout, Time.now() - t0);
+  }
+
+  /**
+   * Reducing the resources (with a time out) triggers a preemption message to
+   * the AM right away. Then, increasing them again should prevent the killing
+   * when the time out would have happened.
+   */
+  @Test
+  public void testReducePreemptAndCancel() throws Exception {
+
+    Container container = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // We give an overcommit time out of 2 seconds
+    final int timeout = (int)TimeUnit.SECONDS.toMillis(1);
+
+    // Reducing to 2GB should first preempt the container
+    updateNodeResource(rm, nmId, 2 * GB, 2, timeout);
+    waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout);
+
+    // We should receive a notification to preempt the container
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(container.getId(), preemptMsg);
+
+    // Increase the resources again
+    updateNodeResource(rm, nmId, 4 * GB, 2, timeout);
+    waitMemory(scheduler, nm, 4 * GB, 0, INTERVAL, timeout);
+
+    long t0 = Time.now();
+    while (Time.now() - t0 < TimeUnit.SECONDS.toMillis(2)) {
+      nm.nodeHeartbeat(true);
+      AllocateResponse allocation = am.schedule();
+      assertNoPreemption(allocation.getPreemptionMessage());
+      assertTrue(allocation.getCompletedContainersStatuses().isEmpty());
+      Thread.sleep(INTERVAL);
+    }
+
+    // Check that the containers are still running
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+    assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers());
+  }
+
+  /**
+   * Test the order we kill multiple containers.
+   * It initially has: AM(2GB), C1(1GB), C2(1GB), AM2(2GB), and C3(2GB).
+   * It should kill in this order: C3, C2, C1, AM2, and AM1.
+   */
+  @Test
+  public void testKillMultipleContainers() throws Exception {
+
+    updateNodeResource(rm, nmId, 8 * GB, 6, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 6 * GB, 200, 5 * 1000);
+
+    // Start 2 containers with 1 GB each
+    Container c1 = createContainer(am, 1 * GB);
+    Container c2 = createContainer(am, 1 * GB);
+    waitMemory(scheduler, nmId, 4 * GB, 4 * GB, 200, 5 * 1000);
+
+    // Start an AM with 2GB
+    RMApp app2 = rm.submitApp(2 * GB, "app2", "user2");
+    nm.nodeHeartbeat(true);
+    RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
+    MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
+    am2.registerAppAttempt();
+    waitMemory(scheduler, nm, 6 * GB, 2 * GB, 200, 5 * 1000);
+    assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+    Container c3 = createContainer(am2, 2 * GB);
+    waitMemory(scheduler, nm, 8 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(5, scheduler.getNodeReport(nmId).getNumContainers());
+
+    // Reduce the resources to kill C3 and C2 (not AM2)
+    updateNodeResource(rm, nmId, 5 * GB, 6, 0);
+    waitMemory(scheduler, nm, 5 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(3, scheduler.getNodeReport(nmId).getNumContainers());
+
+    List<ContainerStatus> completedContainers =
+        am2.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus container3Status = completedContainers.get(0);
+    assertContainerKilled(c3.getId(), container3Status);
+
+    completedContainers = am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus container2Status = completedContainers.get(0);
+    assertContainerKilled(c2.getId(), container2Status);
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+    assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+    // Reduce the resources to kill C1 (not AM2)
+    updateNodeResource(rm, nmId, 4 * GB, 6, 0);
+    waitMemory(scheduler, nm, 4 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers());
+    completedContainers = am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus container1Status = completedContainers.get(0);
+    assertContainerKilled(c1.getId(), container1Status);
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+    assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+    // Reduce the resources to kill AM2
+    updateNodeResource(rm, nmId, 2 * GB, 6, 0);
+    waitMemory(scheduler, nm, 2 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(1, scheduler.getNodeReport(nmId).getNumContainers());
+    assertEquals(RMAppAttemptState.FAILED, attempt2.getState());
+
+    // The first application should be fine and still running
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+  }
+
+  @Test
+  public void testEndToEnd() throws Exception {
+
+    Container c1 = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // check node report, 4 GB used and 0 GB available
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+    nm.nodeHeartbeat(true);
+    assertEquals(4 * GB, nm.getCapability().getMemorySize());
+
+    // update node resource to 2 GB, so resource is over-consumed
+    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+    // the used resource should still 4 GB and negative available resource
+    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+    // check that we did not get a preemption requests
+    assertNoPreemption(am.schedule().getPreemptionMessage());
+
+    // check that the NM got the updated resources
+    nm.nodeHeartbeat(true);
+    assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+    // check container can complete successfully with resource over-commitment
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
+    nm.containerStatus(containerStatus);
+
+    LOG.info("Waiting for containers to be finished for app 1...");
+    GenericTestUtils.waitFor(
+        () -> attempt.getJustFinishedContainers().size() == 1, 100, 2000);
+    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+    assertMemory(scheduler, nmId, 2 * GB, 0);
+
+    // verify no NPE is trigger in schedule after resource is updated
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+    AllocateResponse allocResponse2 = am.schedule();
+    assertTrue("Shouldn't have enough resource to allocate containers",
+        allocResponse2.getAllocatedContainers().isEmpty());
+    // try 10 times as scheduling is an async process
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(100);
+      allocResponse2 = am.schedule();
+      assertTrue("Shouldn't have enough resource to allocate containers",
+          allocResponse2.getAllocatedContainers().isEmpty());
+    }
+
+    // increase the resources again to 5 GB to schedule the 3GB container
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+    // kick the scheduling and check it took effect
+    nm.nodeHeartbeat(true);
+    while (allocResponse2.getAllocatedContainers().isEmpty()) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(100);
+      allocResponse2 = am.schedule();
+    }
+    assertEquals(1, allocResponse2.getAllocatedContainers().size());
+    Container c2 = allocResponse2.getAllocatedContainers().get(0);
+    assertEquals(3 * GB, c2.getResource().getMemorySize());
+    assertEquals(nmId, c2.getNodeId());
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources and trigger a preempt request to the AM for c2
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // increasing the resources again, should stop killing the containers
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+    Thread.sleep(3 * 1000);
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources again to trigger a preempt request to the AM for c2
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // wait until the scheduler kills the container
+    GenericTestUtils.waitFor(() -> {
+      try {
+        nm.nodeHeartbeat(true); // trigger preemption in the NM
+      } catch (Exception e) {
+        LOG.error("Cannot heartbeat", e);
+      }
+      SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+      return report.getAvailableResource().getMemorySize() > 0;
+    }, 200, 5 * 1000);
+    assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus c2status = completedContainers.get(0);
+    assertContainerKilled(c2.getId(), c2status);
+
+    assertTime(2000, Time.now() - t0);
+  }
+
+  /**
+   * Create a container with a particular size and make sure it succeeds.
+   * @param am Application Master to add the container to.
+   * @param memory Memory of the container.
+   * @return Newly created container.
+   * @throws Exception If there are issues creating the container.
+   */
+  protected Container createContainer(
+      final MockAM app, final int memory) throws Exception {
+
+    ResourceRequest req = ResourceRequest.newBuilder()
+        .capability(Resource.newInstance(memory, 1))
+        .numContainers(1)
+        .build();
+    AllocateResponse response = app.allocate(singletonList(req), emptyList());
+    List<Container> allocated = response.getAllocatedContainers();
+    nm.nodeHeartbeat(true);
+    for (int i = 0; allocated.isEmpty() && i < 10; i++) {
+      LOG.info("Waiting for containers to be created for app...");
+      Thread.sleep(INTERVAL);
+      response = app.schedule();
+      allocated = response.getAllocatedContainers();
+      nm.nodeHeartbeat(true);
+    }
+    assertFalse("Cannot create the container", allocated.isEmpty());
+
+    assertEquals(1, allocated.size());
+    final Container c = allocated.get(0);
+    assertEquals(memory, c.getResource().getMemorySize());
+    assertEquals(nmId, c.getNodeId());
+    return c;
+  }
+
+  /**
+   * Update the resources on a Node Manager.
+   * @param rm Resource Manager to contact.
+   * @param nmId Identifier of the Node Manager.
+   * @param memory Memory in MB.
+   * @param vCores Number of virtual cores.
+   * @param overcommitTimeout Timeout for overcommit.
+   * @throws Exception If the update cannot be completed.
+   */
+  public static void updateNodeResource(MockRM rm, NodeId nmId,
+      int memory, int vCores, int overcommitTimeout) throws Exception {
+    AdminService admin = rm.getAdminService();
+    ResourceOption resourceOption = ResourceOption.newInstance(
+        Resource.newInstance(memory, vCores), overcommitTimeout);
+    UpdateNodeResourceRequest req = UpdateNodeResourceRequest.newInstance(
+        singletonMap(nmId, resourceOption));
+    admin.updateNodeResource(req);
+  }
+
+  /**
+   * Make sure that the container was killed.
+   * @param containerId Expected container identifier.
+   * @param status Container status to check.
+   */
+  public static void assertContainerKilled(
+      final ContainerId containerId, final ContainerStatus status) {
+    assertEquals(containerId, status.getContainerId());
+    assertEquals(ContainerState.COMPLETE, status.getState());
+    assertEquals(ContainerExitStatus.PREEMPTED, status.getExitStatus());
+    assertEquals(SchedulerUtils.PREEMPTED_CONTAINER, status.getDiagnostics());
+  }
+
+  /**
+   * Check that an elapsed time is at least the expected time and no more than
+   * two heart beats/scheduling rounds.
+   * @param expectedTime Time expected in milliseconds.
+   * @param time Actual time to check.
+   */
+  public static void assertTime(final long expectedTime, final long time) {
+    assertTrue("Too short: " + time + "ms", time > expectedTime);
+    assertTrue("Too long: " + time + "ms",
+        time < (expectedTime + 2 * INTERVAL));
+  }
+
+  /**
+   * Check that the scheduler didn't ask to preempt anything.
+   * @param msg Preemption message from the scheduler.
+   */
+  public static void assertNoPreemption(final PreemptionMessage msg) {
+    if (msg != null &&
+        msg.getContract() != null &&
+        !msg.getContract().getContainers().isEmpty()) {
+      fail("We shouldn't preempt containers: " + msg);
+    }
+  }
+
+  /**
+   * Check that the scheduler ask to preempt a particular container.
+   * @param containerId Expected container to preempt.
+   * @param msg Preemption message from the scheduler.
+   */
+  public static void assertPreemption(
+      final ContainerId containerId, final PreemptionMessage msg) {
+    assertNotNull("Expected a preemption message", msg);
+    Set<ContainerId> preemptContainers = new HashSet<>();
+    if (msg.getContract() != null) {
+      for (PreemptionContainer c : msg.getContract().getContainers()) {
+        preemptContainers.add(c.getId());
+      }
+    }
+    if (msg.getStrictContract() != null) {
+      for (PreemptionContainer c : msg.getStrictContract().getContainers()) {
+        preemptContainers.add(c.getId());
+      }
+    }
+    assertEquals(Collections.singleton(containerId), preemptContainers);
+  }
+
+  /**
+   * Check if a node report has the expected memory values.
+   * @param scheduler Scheduler with the data.
+   * @param nmId Identifier of the node to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   */
+  public static void assertMemory(ResourceScheduler scheduler, NodeId nmId,
+      long expectedUsed, long expectedAvailable) {
+    SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
+    assertNotNull(nmReport);
+    Resource used = nmReport.getUsedResource();
+    assertEquals("Used memory", expectedUsed, used.getMemorySize());
+    Resource available = nmReport.getAvailableResource();
+    assertEquals("Available memory",
+        expectedAvailable, available.getMemorySize());
+  }
+
+  /**
+   * Wait until the memory of a NM is at a given point.
+   * It does not trigger NM heart beat.
+   * @param scheduler Scheduler with the data.
+   * @param nmId Identifier of the node to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   * @param checkEveryMillis How often to perform the test in ms.
+   * @param waitForMillis The maximum time to wait in ms.
+   * @throws Exception If we don't get to the expected memory.
+   */
+  public static void waitMemory(ResourceScheduler scheduler,
+      NodeId nmId, int expectedUsed, int expectedAvailable,
+      int checkEveryMillis, int waitForMillis) throws Exception {
+    waitMemory(scheduler, nmId, null, expectedUsed, expectedAvailable,
+        checkEveryMillis, waitForMillis);
+  }
+
+  /**
+   * Wait until the memory of a NM is at a given point.
+   * It triggers NM heart beat.
+   * @param scheduler Scheduler with the data.
+   * @param nm Node Manager to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   * @param checkEveryMillis How often to perform the test in ms.
+   * @param waitForMillis The maximum time to wait in ms.
+   * @throws Exception If we don't get to the expected memory.
+   */
+  public static void waitMemory(ResourceScheduler scheduler, MockNM nm,
+      int expectedUsed, int expectedAvailable,
+      int checkEveryMillis, int waitForMillis) throws Exception {
+    waitMemory(scheduler, nm.getNodeId(), nm, expectedUsed, expectedAvailable,
+        checkEveryMillis, waitForMillis);
+  }
+
+  /**
+   * Wait until the memory of a NM is at a given point.
+   * If the NM is specified, it does heart beat.
+   * @param scheduler Scheduler with the data.
+   * @param nmId Identifier of the node to check.
+   * @param nm Node Manager to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   * @param checkEveryMillis How often to perform the test in ms.
+   * @param waitForMillis The maximum time to wait in ms.
+   * @throws Exception If we don't get to the expected memory.
+   */
+  public static void waitMemory(ResourceScheduler scheduler,
+      NodeId nmId, MockNM nm,
+      int expectedUsed, int expectedAvailable,
+      int checkEveryMillis, int waitForMillis) throws Exception {
+
+    long start = Time.monotonicNow();
+    while (Time.monotonicNow() - start < waitForMillis) {
+      try {
+        if (nm != null) {
+          nm.nodeHeartbeat(true);
+        }
+        assertMemory(scheduler, nmId, expectedUsed, expectedAvailable);
+        return;
+      } catch (AssertionError e) {
+        Thread.sleep(checkEveryMillis);
+      }
+    }
+
+    // No success, notify time out
+    SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
+    Resource used = nmReport.getUsedResource();
+    Resource available = nmReport.getAvailableResource();
+    throw new TimeoutException("Took longer than " + waitForMillis +
+        "ms to get to " + expectedUsed + "," + expectedAvailable +
+        " actual=" + used + "," + available);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 5cb49a4..fd8fa05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -21,6 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertPreemption;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertTime;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.updateNodeResource;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.waitMemory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -57,6 +64,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -76,12 +84,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -94,8 +102,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -1310,110 +1316,139 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
   @Test
   public void testResourceOverCommit() throws Exception {
-    int waitCount;
     Configuration conf = new Configuration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
     MockRM rm = new MockRM(conf);
     rm.start();
+    ResourceScheduler scheduler = rm.getResourceScheduler();
 
-    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
-    RMApp app1 = rm.submitApp(2048);
-    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
-    nm1.nodeHeartbeat(true);
-    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
-    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
-    am1.registerAppAttempt();
-    SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
-        nm1.getNodeId());
-    // check node report, 2 GB used and 2 GB available
-    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
-    Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize());
+    MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+    NodeId nmId = nm.getNodeId();
+    RMApp app = rm.submitApp(2048);
+    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm
+    nm.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am.registerAppAttempt();
+    assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
 
-    // add request for containers
-    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
-    AllocateResponse alloc1Response = am1.schedule(); // send the request
+    // add request for 1 container of 2 GB
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1);
+    AllocateResponse alloc1Response = am.schedule(); // send the request
 
     // kick the scheduler, 2 GB given to AM1, resource remaining 0
-    nm1.nodeHeartbeat(true);
-    while (alloc1Response.getAllocatedContainers().size() < 1) {
+    nm.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().isEmpty()) {
       LOG.info("Waiting for containers to be created for app 1...");
       Thread.sleep(100);
-      alloc1Response = am1.schedule();
+      alloc1Response = am.schedule();
     }
 
     List<Container> allocated1 = alloc1Response.getAllocatedContainers();
-    Assert.assertEquals(1, allocated1.size());
-    Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize());
-    Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
-
-    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-    // check node report, 4 GB used and 0 GB available
-    Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
-    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
-
-    // check container is assigned with 2 GB.
+    assertEquals(1, allocated1.size());
     Container c1 = allocated1.get(0);
-    Assert.assertEquals(2 * GB, c1.getResource().getMemorySize());
-
-    // update node resource to 2 GB, so resource is over-consumed.
-    Map<NodeId, ResourceOption> nodeResourceMap =
-        new HashMap<NodeId, ResourceOption>();
-    nodeResourceMap.put(nm1.getNodeId(),
-        ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
-    UpdateNodeResourceRequest request =
-        UpdateNodeResourceRequest.newInstance(nodeResourceMap);
-    AdminService as = ((MockRM)rm).getAdminService();
-    as.updateNodeResource(request);
-
-    waitCount = 0;
-    while (waitCount++ != 20) {
-      report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-      if (report_nm1.getAvailableResource().getMemorySize() != 0) {
-        break;
-      }
-      LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried "
-          + waitCount + " times already..");
-      Thread.sleep(1000);
-    }
-    // Now, the used resource is still 4 GB, and available resource is minus value.
-    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
-    Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
+    assertEquals(2 * GB, c1.getResource().getMemorySize());
+    assertEquals(nmId, c1.getNodeId());
 
-    // Check container can complete successfully in case of resource over-commitment.
+    // check node report, 4 GB used and 0 GB available
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+    nm.nodeHeartbeat(true);
+    assertEquals(4 * GB, nm.getCapability().getMemorySize());
+
+    // update node resource to 2 GB, so resource is over-consumed
+    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+    // the used resource should still 4 GB and negative available resource
+    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+    // check that we did not get a preemption requests
+    assertNoPreemption(am.schedule().getPreemptionMessage());
+
+    // check that the NM got the updated resources
+    nm.nodeHeartbeat(true);
+    assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+    // check container can complete successfully with resource over-commitment
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
-    nm1.containerStatus(containerStatus);
-    waitCount = 0;
-    while (attempt1.getJustFinishedContainers().size() < 1
-        && waitCount++ != 20) {
-      LOG.info("Waiting for containers to be finished for app 1... Tried "
-          + waitCount + " times already..");
+    nm.containerStatus(containerStatus);
+
+    LOG.info("Waiting for containers to be finished for app 1...");
+    GenericTestUtils.waitFor(
+        () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000);
+    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+    assertMemory(scheduler, nmId, 2 * GB, 0);
+
+    // verify no NPE is trigger in schedule after resource is updated
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+    AllocateResponse allocResponse2 = am.schedule();
+    assertTrue("Shouldn't have enough resource to allocate containers",
+        allocResponse2.getAllocatedContainers().isEmpty());
+    // try 10 times as scheduling is an async process
+    for (int i = 0; i < 10; i++) {
       Thread.sleep(100);
+      allocResponse2 = am.schedule();
+      assertTrue("Shouldn't have enough resource to allocate containers",
+          allocResponse2.getAllocatedContainers().isEmpty());
     }
-    Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
-    Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
-    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
-    // As container return 2 GB back, the available resource becomes 0 again.
-    Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
-
-    // Verify no NPE is trigger in schedule after resource is updated.
-    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
-    alloc1Response = am1.schedule();
-    Assert.assertEquals("Shouldn't have enough resource to allocate containers",
-        0, alloc1Response.getAllocatedContainers().size());
-    int times = 0;
-    // try 10 times as scheduling is async process.
-    while (alloc1Response.getAllocatedContainers().size() < 1
-        && times++ < 10) {
-      LOG.info("Waiting for containers to be allocated for app 1... Tried "
-          + times + " times already..");
+
+    // increase the resources again to 5 GB to schedule the 3GB container
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+    // kick the scheduling and check it took effect
+    nm.nodeHeartbeat(true);
+    while (allocResponse2.getAllocatedContainers().isEmpty()) {
+      LOG.info("Waiting for containers to be created for app 1...");
       Thread.sleep(100);
+      allocResponse2 = am.schedule();
     }
-    Assert.assertEquals("Shouldn't have enough resource to allocate containers",
-        0, alloc1Response.getAllocatedContainers().size());
+    assertEquals(1, allocResponse2.getAllocatedContainers().size());
+    Container c2 = allocResponse2.getAllocatedContainers().get(0);
+    assertEquals(3 * GB, c2.getResource().getMemorySize());
+    assertEquals(nmId, c2.getNodeId());
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources and trigger a preempt request to the AM for c2
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // increasing the resources again, should stop killing the containers
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+    Thread.sleep(3 * 1000);
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources again to trigger a preempt request to the AM for c2
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // wait until the scheduler kills the container
+    GenericTestUtils.waitFor(() -> {
+      try {
+        nm.nodeHeartbeat(true); // trigger preemption in the NM
+      } catch (Exception e) {
+        LOG.error("Cannot heartbeat", e);
+      }
+      SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+      return report.getAvailableResource().getMemorySize() > 0;
+    }, 200, 5 * 1000);
+    assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus c2status = completedContainers.get(0);
+    assertContainerKilled(c2.getId(), c2status);
+
+    assertTime(2000, Time.now() - t0);
+
     rm.stop();
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java
new file mode 100644
index 0000000..27eb3ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java
@@ -0,0 +1,52 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit;
+
+/**
+ * Test changing resources and overcommit in the Capacity Scheduler
+ * {@link CapacityScheduler}.
+ */
+public class TestCapacitySchedulerOvercommit extends TestSchedulerOvercommit {
+
+  @Override
+  protected Configuration getConfiguration() {
+    Configuration conf = super.getConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER,
+        CapacityScheduler.class, ResourceScheduler.class);
+
+    // Remove limits on AMs to allow multiple applications running
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    csConf.setMaximumApplicationMasterResourcePerQueuePercent(
+        CapacitySchedulerConfiguration.ROOT, 100.0f);
+    csConf.setMaximumAMResourcePercentPerPartition(
+        CapacitySchedulerConfiguration.ROOT, "", 100.0f);
+    csConf.setMaximumApplicationMasterResourcePerQueuePercent(
+        CapacitySchedulerConfiguration.ROOT + ".default", 100.0f);
+    csConf.setMaximumAMResourcePercentPerPartition(
+        CapacitySchedulerConfiguration.ROOT + ".default", "", 100.0f);
+
+    return csConf;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java
new file mode 100644
index 0000000..9d31f99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit;
+
+/**
+ * Test changing resources and overcommit in the Fair Scheduler
+ * {@link FairScheduler}.
+ */
+public class TestFairSchedulerOvercommit extends TestSchedulerOvercommit {
+
+  @Override
+  protected Configuration getConfiguration() {
+    Configuration conf = super.getConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER,
+        FairScheduler.class, ResourceScheduler.class);
+
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10 * GB);
+    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+    conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+
+    return conf;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties
new file mode 100644
index 0000000..addcd53
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties
@@ -0,0 +1,23 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+*.period=10
+*.periodMillis=100
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties
new file mode 100644
index 0000000..addcd53
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties
@@ -0,0 +1,23 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+*.period=10
+*.periodMillis=100


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org