You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/18 19:25:30 UTC

[30/46] hadoop git commit: CapacityScheduler: Improve preemption to only kill containers that would satisfy the incoming request. (Wangda Tan)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
new file mode 100644
index 0000000..19148d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class PreemptableQueue {
+  // Partition -> killable resources and containers
+  private Map<String, Resource> totalKillableResources = new HashMap<>();
+  private Map<String, Map<ContainerId, RMContainer>> killableContainers =
+      new HashMap<>();
+  private PreemptableQueue parent;
+
+  public PreemptableQueue(PreemptableQueue parent) {
+    this.parent = parent;
+  }
+
+  public PreemptableQueue(Map<String, Resource> totalKillableResources,
+      Map<String, Map<ContainerId, RMContainer>> killableContainers) {
+    this.totalKillableResources = totalKillableResources;
+    this.killableContainers = killableContainers;
+  }
+
+  void addKillableContainer(KillableContainer container) {
+    String partition = container.getNodePartition();
+    if (!totalKillableResources.containsKey(partition)) {
+      totalKillableResources.put(partition, Resources.createResource(0));
+      killableContainers.put(partition,
+          new ConcurrentSkipListMap<ContainerId, RMContainer>());
+    }
+
+    RMContainer c = container.getRMContainer();
+    Resources.addTo(totalKillableResources.get(partition),
+        c.getAllocatedResource());
+    killableContainers.get(partition).put(c.getContainerId(), c);
+
+    if (null != parent) {
+      parent.addKillableContainer(container);
+    }
+  }
+
+  void removeKillableContainer(KillableContainer container) {
+    String partition = container.getNodePartition();
+    Map<ContainerId, RMContainer> partitionKillableContainers =
+        killableContainers.get(partition);
+    if (partitionKillableContainers != null) {
+      RMContainer rmContainer = partitionKillableContainers.remove(
+          container.getRMContainer().getContainerId());
+      if (null != rmContainer) {
+        Resources.subtractFrom(totalKillableResources.get(partition),
+            rmContainer.getAllocatedResource());
+      }
+    }
+
+    if (null != parent) {
+      parent.removeKillableContainer(container);
+    }
+  }
+
+  public Resource getKillableResource(String partition) {
+    Resource res = totalKillableResources.get(partition);
+    return res == null ? Resources.none() : res;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
+    Map<ContainerId, RMContainer> map = killableContainers.get(partition);
+    return map == null ? Collections.EMPTY_MAP : map;
+  }
+
+  public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
+    return killableContainers;
+  }
+
+  Map<String, Resource> getTotalKillableResources() {
+    return totalKillableResources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
new file mode 100644
index 0000000..a9f02a5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class PreemptionManager {
+  private ReentrantReadWriteLock.ReadLock readLock;
+  private ReentrantReadWriteLock.WriteLock writeLock;
+  private Map<String, PreemptableQueue> entities = new HashMap<>();
+
+  public PreemptionManager() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  public void refreshQueues(CSQueue parent, CSQueue current) {
+    try {
+      writeLock.lock();
+      PreemptableQueue parentEntity = null;
+      if (parent != null) {
+        parentEntity = entities.get(parent.getQueueName());
+      }
+
+      if (!entities.containsKey(current.getQueueName())) {
+        entities.put(current.getQueueName(),
+            new PreemptableQueue(parentEntity));
+      }
+
+      if (current.getChildQueues() != null) {
+        for (CSQueue child : current.getChildQueues()) {
+          refreshQueues(current, child);
+        }
+      }
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void addKillableContainer(KillableContainer container) {
+    try {
+      writeLock.lock();
+      PreemptableQueue entity = entities.get(container.getLeafQueueName());
+      if (null != entity) {
+        entity.addKillableContainer(container);
+      }
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void removeKillableContainer(KillableContainer container) {
+    try {
+      writeLock.lock();
+      PreemptableQueue entity = entities.get(container.getLeafQueueName());
+      if (null != entity) {
+        entity.removeKillableContainer(container);
+      }
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void moveKillableContainer(KillableContainer oldContainer,
+      KillableContainer newContainer) {
+    // TODO, will be called when partition of the node changed OR
+    // container moved to different queue
+  }
+
+  public void updateKillableContainerResource(KillableContainer container,
+      Resource oldResource, Resource newResource) {
+    // TODO, will be called when container's resource changed
+  }
+
+  @VisibleForTesting
+  public Map<ContainerId, RMContainer> getKillableContainersMap(
+      String queueName, String partition) {
+    try {
+      readLock.lock();
+      PreemptableQueue entity = entities.get(queueName);
+      if (entity != null) {
+        Map<ContainerId, RMContainer> containers =
+            entity.getKillableContainers().get(partition);
+        if (containers != null) {
+          return containers;
+        }
+      }
+      return Collections.emptyMap();
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public Iterator<RMContainer> getKillableContainers(String queueName,
+      String partition) {
+    return getKillableContainersMap(queueName, partition).values().iterator();
+  }
+
+  public Resource getKillableResource(String queueName, String partition) {
+    try {
+      readLock.lock();
+      PreemptableQueue entity = entities.get(queueName);
+      if (entity != null) {
+        Resource res = entity.getTotalKillableResources().get(partition);
+        if (res == null || res.equals(Resources.none())) {
+          return Resources.none();
+        }
+        return Resources.clone(res);
+      }
+      return Resources.none();
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
+    try {
+      readLock.lock();
+      Map<String, PreemptableQueue> map = new HashMap<>();
+      for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
+        String key = entry.getKey();
+        PreemptableQueue entity = entry.getValue();
+        map.put(key, new PreemptableQueue(
+            new HashMap<>(entity.getTotalKillableResources()),
+            new HashMap<>(entity.getKillableContainers())));
+      }
+      return map;
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
index 5158255..aad3bc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
@@ -120,9 +120,9 @@ public class AssignmentInformation {
   }
 
   private ContainerId getFirstContainerIdFromOperation(Operation op) {
-    if (null != operationDetails.get(Operation.ALLOCATION)) {
+    if (null != operationDetails.get(op)) {
       List<AssignmentDetails> assignDetails =
-          operationDetails.get(Operation.ALLOCATION);
+          operationDetails.get(op);
       if (!assignDetails.isEmpty()) {
         return assignDetails.get(0).containerId;
       }
@@ -131,7 +131,7 @@ public class AssignmentInformation {
   }
 
   public ContainerId getFirstAllocatedOrReservedContainerId() {
-    ContainerId containerId = null;
+    ContainerId containerId;
     containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
     if (null != containerId) {
       return containerId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 4d563cd..f474aad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -94,6 +95,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * to hold the message if its app doesn't not get container from a node
    */
   private String appSkipNodeDiagnostics;
+  private CapacitySchedulerContext capacitySchedulerContext;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -138,28 +140,30 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
     
     containerAllocator = new ContainerAllocator(this, rc, rmContext);
+
+    if (scheduler instanceof CapacityScheduler) {
+      capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
+    }
   }
 
-  synchronized public boolean containerCompleted(RMContainer rmContainer,
+  public synchronized boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event,
       String partition) {
+    ContainerId containerId = rmContainer.getContainerId();
 
     // Remove from the list of containers
-    if (null == liveContainers.remove(rmContainer.getContainerId())) {
+    if (null == liveContainers.remove(containerId)) {
       return false;
     }
-    
+
     // Remove from the list of newly allocated containers if found
     newlyAllocatedContainers.remove(rmContainer);
 
-    Container container = rmContainer.getContainer();
-    ContainerId containerId = container.getId();
-
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(containerId, containerStatus, event));
 
-    containersToPreempt.remove(rmContainer.getContainerId());
+    containersToPreempt.remove(containerId);
 
     RMAuditLogger.logSuccess(getUser(),
         AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
@@ -176,7 +180,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return true;
   }
 
-  synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+  public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
       Priority priority, ResourceRequest request, 
       Container container) {
 
@@ -200,7 +204,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     // Add it to allContainers list.
     newlyAllocatedContainers.add(rmContainer);
-    liveContainers.put(container.getId(), rmContainer);    
+
+    ContainerId containerId = container.getId();
+    liveContainers.put(containerId, rmContainer);
 
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
@@ -213,17 +219,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     // Inform the container
     rmContainer.handle(
-        new RMContainerEvent(container.getId(), RMContainerEventType.START));
+        new RMContainerEvent(containerId, RMContainerEventType.START));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocate: applicationAttemptId=" 
-          + container.getId().getApplicationAttemptId() 
-          + " container=" + container.getId() + " host="
+          + containerId.getApplicationAttemptId()
+          + " container=" + containerId + " host="
           + container.getNodeId().getHost() + " type=" + type);
     }
     RMAuditLogger.logSuccess(getUser(),
         AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
-        getApplicationId(), container.getId());
+        getApplicationId(), containerId);
     
     return rmContainer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index fe6db47..1d0e78a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -18,22 +18,29 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
-
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 
 public class FiCaSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
+  private Map<ContainerId, RMContainer> killableContainers = new HashMap<>();
+  private Resource totalKillableResources = Resource.newInstance(0, 0);
   
   public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
       Set<String> nodeLabels) {
@@ -92,7 +99,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
   @Override
   public synchronized void unreserveResource(
       SchedulerApplicationAttempt application) {
-
     // adding NP checks as this can now be called for preemption
     if (getReservedContainer() != null
         && getReservedContainer().getContainer() != null
@@ -115,4 +121,55 @@ public class FiCaSchedulerNode extends SchedulerNode {
     }
     setReservedContainer(null);
   }
+
+  // According to decisions from preemption policy, mark the container to killable
+  public synchronized void markContainerToKillable(ContainerId containerId) {
+    RMContainer c = launchedContainers.get(containerId);
+    if (c != null && !killableContainers.containsKey(containerId)) {
+      killableContainers.put(containerId, c);
+      Resources.addTo(totalKillableResources, c.getAllocatedResource());
+    }
+  }
+
+  // According to decisions from preemption policy, mark the container to
+  // non-killable
+  public synchronized void markContainerToNonKillable(ContainerId containerId) {
+    RMContainer c = launchedContainers.get(containerId);
+    if (c != null && killableContainers.containsKey(containerId)) {
+      killableContainers.remove(containerId);
+      Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());
+    }
+  }
+
+  @Override
+  protected synchronized void updateResource(
+      Container container) {
+    super.updateResource(container);
+    if (killableContainers.containsKey(container.getId())) {
+      Resources.subtractFrom(totalKillableResources, container.getResource());
+      killableContainers.remove(container.getId());
+    }
+  }
+
+  @Override
+  protected synchronized void changeContainerResource(ContainerId containerId,
+      Resource deltaResource, boolean increase) {
+    super.changeContainerResource(containerId, deltaResource, increase);
+
+    if (killableContainers.containsKey(containerId)) {
+      if (increase) {
+        Resources.addTo(totalKillableResources, deltaResource);
+      } else {
+        Resources.subtractFrom(totalKillableResources, deltaResource);
+      }
+    }
+  }
+
+  public synchronized Resource getTotalKillableResources() {
+    return totalKillableResources;
+  }
+
+  public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
+    return killableContainers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 9cf09e9..35b7c14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -38,10 +38,15 @@ public enum SchedulerEventType {
   // Source: ContainerAllocationExpirer
   CONTAINER_EXPIRED,
 
-  // Source: SchedulingEditPolicy
+  /* Source: SchedulingEditPolicy */
   KILL_RESERVED_CONTAINER,
-  MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption
-                                 // in the near future
-  KILL_PREEMPTED_CONTAINER // Kill a container previously marked for
-                           // preemption
+
+  // Mark a container for preemption
+  MARK_CONTAINER_FOR_PREEMPTION,
+
+  // Mark a for-preemption container killable
+  MARK_CONTAINER_FOR_KILLABLE,
+
+  // Cancel a killable container
+  MARK_CONTAINER_FOR_NONKILLABLE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index d9306dd..c944752 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -59,7 +59,7 @@ public class TestRMDispatcher {
       rmDispatcher.getEventHandler().handle(event1);
       ContainerPreemptEvent event2 =
           new ContainerPreemptEvent(appAttemptId, container,
-            SchedulerEventType.KILL_PREEMPTED_CONTAINER);
+            SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE);
       rmDispatcher.getEventHandler().handle(event2);
       ContainerPreemptEvent event3 =
           new ContainerPreemptEvent(appAttemptId, container,
@@ -70,7 +70,7 @@ public class TestRMDispatcher {
       verify(sched, times(3)).handle(any(SchedulerEvent.class));
       verify(sched).killReservedContainer(container);
       verify(sched).markContainerForPreemption(appAttemptId, container);
-      verify(sched).killPreemptedContainer(container);
+      verify(sched).markContainerForKillable(container);
     } catch (InterruptedException e) {
       Assert.fail();
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 028afb1..3057615 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -2352,7 +2352,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
           .get(app0.getApplicationId()).getCurrentAppAttempt();
       // kill app0-attempt
-      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(
+      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
           app0.getCurrentAppAttempt().getMasterContainer().getId()));
       am0.waitForState(RMAppAttemptState.FAILED);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 5035afe..16f3f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
@@ -566,7 +565,7 @@ public class TestAMRestart {
     ContainerId amContainer =
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
     // Preempt the first attempt;
-    scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
+    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
 
     am1.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
@@ -582,7 +581,7 @@ public class TestAMRestart {
     // Preempt the second attempt.
     ContainerId amContainer2 =
         ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
-    scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2));
+    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
 
     am2.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
@@ -677,7 +676,7 @@ public class TestAMRestart {
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
 
     // Forcibly preempt the am container;
-    scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
+    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
 
     am1.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 13f267d..e9129de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -167,6 +168,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mCS.getConfiguration()).thenReturn(schedConf);
     rmContext = mock(RMContext.class);
     when(mCS.getRMContext()).thenReturn(rmContext);
+    when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
     Dispatcher disp = mock(Dispatcher.class);
@@ -289,7 +291,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
     for (ContainerPreemptEvent e : events.subList(20, 20)) {
       assertEquals(appC, e.getAppId());
-      assertEquals(KILL_PREEMPTED_CONTAINER, e.getType());
+      assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 512f37c..21ea495 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -123,6 +124,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     mClock = mock(Clock.class);
     cs = mock(CapacityScheduler.class);
     when(cs.getResourceCalculator()).thenReturn(rc);
+    when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
 
     nlm = mock(RMNodeLabelsManager.class);
     mDisp = mock(EventHandler.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 0b32676..171196f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -264,6 +265,7 @@ public class TestApplicationLimits {
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 1569a12..d8161f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -205,7 +205,7 @@ public class TestApplicationPriority {
       if (++counter > 2) {
         break;
       }
-      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
     }
 
     // check node report, 12 GB used and 4 GB available
@@ -512,7 +512,7 @@ public class TestApplicationPriority {
       if (++counter > 2) {
         break;
       }
-      cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+      cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
       iterator.remove();
     }
 
@@ -542,7 +542,7 @@ public class TestApplicationPriority {
       if (++counter > 1) {
         break;
       }
-      cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+      cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
       iterator.remove();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index b6c005b..16ba607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -1188,7 +1188,7 @@ public class TestCapacityScheduler {
 
     // kill the 3 containers
     for (Container c : allocatedContainers) {
-      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
     }
 
     // check values
@@ -1197,7 +1197,7 @@ public class TestCapacityScheduler {
         Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
 
     // kill app0-attempt0 AM container
-    cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0
+    cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
         .getCurrentAppAttempt().getMasterContainer().getId()));
 
     // wait for app0 failed
@@ -1220,7 +1220,7 @@ public class TestCapacityScheduler {
     allocatedContainers =
         am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
     for (Container c : allocatedContainers) {
-      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
     }
 
     // check values
@@ -1269,7 +1269,7 @@ public class TestCapacityScheduler {
     }
 
     // Call killContainer to preempt the container
-    cs.killPreemptedContainer(rmContainer);
+    cs.markContainerForKillable(rmContainer);
 
     Assert.assertEquals(3, requests.size());
     for (ResourceRequest request : requests) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
new file mode 100644
index 0000000..bea7797
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerPreemption {
+  private static final Log LOG = LogFactory.getLog(
+      TestCapacitySchedulerPreemption.class);
+
+  private final int GB = 1024;
+
+  private Configuration conf;
+
+  RMNodeLabelsManager mgr;
+
+  Clock clock;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
+    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+
+    // Set preemption related configurations
+    conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
+        0);
+    conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+        true);
+    conf.setFloat(
+        ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
+    conf.setFloat(
+        ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(this.conf);
+    clock = mock(Clock.class);
+    when(clock.getTime()).thenReturn(0L);
+  }
+
+  private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
+    RMActiveServices activeServices = rm.getRMActiveService();
+    SchedulingMonitor mon = null;
+    for (Service service : activeServices.getServices()) {
+      if (service instanceof SchedulingMonitor) {
+        mon = (SchedulingMonitor) service;
+        break;
+      }
+    }
+
+    if (mon != null) {
+      return mon.getSchedulingEditPolicy();
+    }
+    return null;
+  }
+
+  @Test (timeout = 60000)
+  public void testSimplePreemption() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+     * more resource available.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * Now the cluster is fulfilled.
+     *
+     * 4) app2 asks for another 1G container, system will preempt one container
+     * from app1, and app2 will receive the preempted container
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(1, killableContainers.size());
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 6 containers, and app2 has 2 containers
+    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersNodeLocalityDelay()
+      throws Exception {
+    /**
+     * Test case: same as testSimplePreemption steps 1-3.
+     *
+     * Step 4: app2 asks for 1G container with locality specified, so it needs
+     * to wait for missed-opportunity before get scheduled.
+     * Check if system waits missed-opportunity before finish killable container
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container with unknown host and unknown rack
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1), ResourceRequest
+        .newInstance(Priority.newInstance(1), "unknownhost",
+            Resources.createResource(1 * GB), 1), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    // Do allocation again, one container will be preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // App1 has 6 containers, and app2 has 2 containers (new container allocated)
+    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersHardNodeLocality()
+      throws Exception {
+    /**
+     * Test case: same as testSimplePreemption steps 1-3.
+     *
+     * Step 4: app2 asks for 1G container with hard locality specified, and
+     *         asked host is not existed
+     * Confirm system doesn't preempt any container.
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container for h3 with hard locality,
+    // h3 doesn't exist in the cluster
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1, true), ResourceRequest
+        .newInstance(Priority.newInstance(1), "h3",
+            Resources.createResource(1 * GB), 1, false), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 1, false)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    // Do allocation again, nothing will be preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // App1 has 7 containers, and app2 has 1 containers (no container allocated)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
+      throws Exception {
+    /**
+     * Test case:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     * Submit applications to two queues, one uses more than the other, so
+     * preemption will happen.
+     *
+     * Check:
+     * 1) Killable containers resources will be excluded from PCPP (no duplicated
+     *    container added to killable list)
+     * 2) When more resources need to be preempted, new containers will be selected
+     *    and killable containers will be considered
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 6 times for node1
+    for (int i = 0; i < 6; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+
+    // Get edit policy and do one update
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+
+    // Check killable containers and to-be-preempted containers in edit policy
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Run edit schedule again, confirm status doesn't changed
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Save current to kill containers
+    Set<ContainerId> previousKillableContainers = new HashSet<>(
+        pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+            .keySet());
+
+    // Update request resource of c from 1 to 2, so we need to preempt
+    // one more container
+    am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
+
+    // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+    // and 1 container in killable map
+    editPolicy.editSchedule();
+    Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
+
+    // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Check if previous killable containers included by new killable containers
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+    Assert.assertTrue(
+        Sets.difference(previousKillableContainers, killableContainers.keySet())
+            .isEmpty());
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
+      throws Exception {
+    /**
+     * Test case:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     * Submit applications to two queues, one uses more than the other, so
+     * preemption will happen.
+     *
+     * Check:
+     * 1) Containers will be marked to killable
+     * 2) Cancel resource request
+     * 3) Killable containers will be cancelled from policy and scheduler
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 6 times for node1
+    for (int i = 0; i < 6; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
+
+    // Get edit policy and do one update
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if 3 container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+    // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
+    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+
+    // Call editSchedule once more to make sure still nothing happens
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersUserLimit()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
+     *
+     * 1) Two nodes in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+     * more resource available.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * Now the cluster is fulfilled.
+     *
+     * 4) app2 asks for another 1G container, system will preempt one container
+     * from app1, and app2 will receive the preempted container
+     */
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
+    csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
+    MockRM rm1 = new MockRM(csConf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if no container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    // No preemption happens
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+    Assert.assertEquals(0, killableContainers.size());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  private Map<ContainerId, RMContainer> waitKillableContainersSize(
+      PreemptionManager pm, String queueName, String partition,
+      int expectedSize) throws InterruptedException {
+    Map<ContainerId, RMContainer> killableContainers =
+        pm.getKillableContainersMap(queueName, partition);
+
+    int wait = 0;
+    // Wait for at most 5 sec (it should be super fast actually)
+    while (expectedSize != killableContainers.size() && wait < 500) {
+      killableContainers = pm.getKillableContainersMap(queueName, partition);
+      Thread.sleep(10);
+      wait++;
+    }
+
+    Assert.assertEquals(expectedSize, killableContainers.size());
+    return killableContainers;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 5169337..1612201 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -99,6 +100,7 @@ public class TestChildQueueOrder {
     when(csContext.getResourceCalculator()).
     thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
   }
 
   private FiCaSchedulerApp getMockApplication(int appId, String user) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 69b0813..87a3d51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -150,6 +151,7 @@ public class TestLeafQueue {
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getRMContext()).thenReturn(rmContext);
     RMContainerTokenSecretManager containerTokenSecretManager =
         new RMContainerTokenSecretManager(conf);
@@ -3092,6 +3094,7 @@ public class TestLeafQueue {
         Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(2 * GB, 2));
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     return csContext;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index bbf6e43..1ee201d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -1676,4 +1677,100 @@ public class TestNodeLabelContainerAllocation {
     checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
         cs.getApplicationAttempt(am1.getApplicationAttemptId()));
   }
+
+  @Test
+  public void testParentQueueMaxCapsAreRespected() throws Exception {
+    /*
+     * Queue tree:
+     *          Root
+     *        /     \
+     *       A       B
+     *      / \
+     *     A1 A2
+     *
+     * A has 50% capacity and 50% max capacity (of label=x)
+     * A1/A2 has 50% capacity and 100% max capacity (of label=x)
+     * Cluster has one node (label=x) with resource = 24G.
+     * So we can at most use 12G resources under queueA.
+     */
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+        "b"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 10);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 50);
+    csConf.setMaximumCapacityByLabel(A, "x", 50);
+
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 90);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 50);
+    csConf.setMaximumCapacityByLabel(B, "x", 50);
+
+    // Define 2nd-level queues
+    csConf.setQueues(A, new String[] { "a1",
+        "a2"});
+
+    final String A1 = A + ".a1";
+    csConf.setCapacity(A1, 50);
+    csConf.setAccessibleNodeLabels(A1, toSet("x"));
+    csConf.setCapacityByLabel(A1, "x", 50);
+    csConf.setMaximumCapacityByLabel(A1, "x", 100);
+    csConf.setUserLimitFactor(A1, 100.0f);
+
+    final String A2 = A + ".a2";
+    csConf.setCapacity(A2, 50);
+    csConf.setAccessibleNodeLabels(A2, toSet("x"));
+    csConf.setCapacityByLabel(A2, "x", 50);
+    csConf.setMaximumCapacityByLabel(A2, "x", 100);
+    csConf.setUserLimitFactor(A2, 100.0f);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of(
+        NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    MockNM nm1 =
+        new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
+    doNMHeartbeat(rm, nm1.getNodeId(), 10);
+    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+
+    // Try to launch app2 in a2, asked 2GB, should success
+    RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+    // am2 asks more resources, cannot success because current used = 9G (app1)
+    // + 2G (app2) = 11G, and queue's max capacity = 12G
+    am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");
+
+    doNMHeartbeat(rm, nm1.getNodeId(), 10);
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index f73baa4..23dc860 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -92,6 +93,7 @@ public class TestParentQueue {
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getNonPartitionedQueueComparator()).
     thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getResourceCalculator()).
     thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 2ef5e39..56facee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -126,6 +127,7 @@ public class TestReservations {
     when(csContext.getNonPartitionedQueueComparator()).thenReturn(
         CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getRMContext()).thenReturn(rmContext);
     RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
         conf);