You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/18 19:25:33 UTC

[33/46] hadoop git commit: Revert "CapacityScheduler: Improve preemption to only kill containers that would satisfy the incoming request. (Wangda Tan)"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
deleted file mode 100644
index 19148d7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-public class PreemptableQueue {
-  // Partition -> killable resources and containers
-  private Map<String, Resource> totalKillableResources = new HashMap<>();
-  private Map<String, Map<ContainerId, RMContainer>> killableContainers =
-      new HashMap<>();
-  private PreemptableQueue parent;
-
-  public PreemptableQueue(PreemptableQueue parent) {
-    this.parent = parent;
-  }
-
-  public PreemptableQueue(Map<String, Resource> totalKillableResources,
-      Map<String, Map<ContainerId, RMContainer>> killableContainers) {
-    this.totalKillableResources = totalKillableResources;
-    this.killableContainers = killableContainers;
-  }
-
-  void addKillableContainer(KillableContainer container) {
-    String partition = container.getNodePartition();
-    if (!totalKillableResources.containsKey(partition)) {
-      totalKillableResources.put(partition, Resources.createResource(0));
-      killableContainers.put(partition,
-          new ConcurrentSkipListMap<ContainerId, RMContainer>());
-    }
-
-    RMContainer c = container.getRMContainer();
-    Resources.addTo(totalKillableResources.get(partition),
-        c.getAllocatedResource());
-    killableContainers.get(partition).put(c.getContainerId(), c);
-
-    if (null != parent) {
-      parent.addKillableContainer(container);
-    }
-  }
-
-  void removeKillableContainer(KillableContainer container) {
-    String partition = container.getNodePartition();
-    Map<ContainerId, RMContainer> partitionKillableContainers =
-        killableContainers.get(partition);
-    if (partitionKillableContainers != null) {
-      RMContainer rmContainer = partitionKillableContainers.remove(
-          container.getRMContainer().getContainerId());
-      if (null != rmContainer) {
-        Resources.subtractFrom(totalKillableResources.get(partition),
-            rmContainer.getAllocatedResource());
-      }
-    }
-
-    if (null != parent) {
-      parent.removeKillableContainer(container);
-    }
-  }
-
-  public Resource getKillableResource(String partition) {
-    Resource res = totalKillableResources.get(partition);
-    return res == null ? Resources.none() : res;
-  }
-
-  @SuppressWarnings("unchecked")
-  public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
-    Map<ContainerId, RMContainer> map = killableContainers.get(partition);
-    return map == null ? Collections.EMPTY_MAP : map;
-  }
-
-  public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
-    return killableContainers;
-  }
-
-  Map<String, Resource> getTotalKillableResources() {
-    return totalKillableResources;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
deleted file mode 100644
index a9f02a5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class PreemptionManager {
-  private ReentrantReadWriteLock.ReadLock readLock;
-  private ReentrantReadWriteLock.WriteLock writeLock;
-  private Map<String, PreemptableQueue> entities = new HashMap<>();
-
-  public PreemptionManager() {
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    readLock = lock.readLock();
-    writeLock = lock.writeLock();
-  }
-
-  public void refreshQueues(CSQueue parent, CSQueue current) {
-    try {
-      writeLock.lock();
-      PreemptableQueue parentEntity = null;
-      if (parent != null) {
-        parentEntity = entities.get(parent.getQueueName());
-      }
-
-      if (!entities.containsKey(current.getQueueName())) {
-        entities.put(current.getQueueName(),
-            new PreemptableQueue(parentEntity));
-      }
-
-      if (current.getChildQueues() != null) {
-        for (CSQueue child : current.getChildQueues()) {
-          refreshQueues(current, child);
-        }
-      }
-    }
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  public void addKillableContainer(KillableContainer container) {
-    try {
-      writeLock.lock();
-      PreemptableQueue entity = entities.get(container.getLeafQueueName());
-      if (null != entity) {
-        entity.addKillableContainer(container);
-      }
-    }
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  public void removeKillableContainer(KillableContainer container) {
-    try {
-      writeLock.lock();
-      PreemptableQueue entity = entities.get(container.getLeafQueueName());
-      if (null != entity) {
-        entity.removeKillableContainer(container);
-      }
-    }
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  public void moveKillableContainer(KillableContainer oldContainer,
-      KillableContainer newContainer) {
-    // TODO, will be called when partition of the node changed OR
-    // container moved to different queue
-  }
-
-  public void updateKillableContainerResource(KillableContainer container,
-      Resource oldResource, Resource newResource) {
-    // TODO, will be called when container's resource changed
-  }
-
-  @VisibleForTesting
-  public Map<ContainerId, RMContainer> getKillableContainersMap(
-      String queueName, String partition) {
-    try {
-      readLock.lock();
-      PreemptableQueue entity = entities.get(queueName);
-      if (entity != null) {
-        Map<ContainerId, RMContainer> containers =
-            entity.getKillableContainers().get(partition);
-        if (containers != null) {
-          return containers;
-        }
-      }
-      return Collections.emptyMap();
-    }
-    finally {
-      readLock.unlock();
-    }
-  }
-
-  public Iterator<RMContainer> getKillableContainers(String queueName,
-      String partition) {
-    return getKillableContainersMap(queueName, partition).values().iterator();
-  }
-
-  public Resource getKillableResource(String queueName, String partition) {
-    try {
-      readLock.lock();
-      PreemptableQueue entity = entities.get(queueName);
-      if (entity != null) {
-        Resource res = entity.getTotalKillableResources().get(partition);
-        if (res == null || res.equals(Resources.none())) {
-          return Resources.none();
-        }
-        return Resources.clone(res);
-      }
-      return Resources.none();
-    }
-    finally {
-      readLock.unlock();
-    }
-  }
-
-  public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
-    try {
-      readLock.lock();
-      Map<String, PreemptableQueue> map = new HashMap<>();
-      for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
-        String key = entry.getKey();
-        PreemptableQueue entity = entry.getValue();
-        map.put(key, new PreemptableQueue(
-            new HashMap<>(entity.getTotalKillableResources()),
-            new HashMap<>(entity.getKillableContainers())));
-      }
-      return map;
-    } finally {
-      readLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
index aad3bc7..5158255 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
@@ -120,9 +120,9 @@ public class AssignmentInformation {
   }
 
   private ContainerId getFirstContainerIdFromOperation(Operation op) {
-    if (null != operationDetails.get(op)) {
+    if (null != operationDetails.get(Operation.ALLOCATION)) {
       List<AssignmentDetails> assignDetails =
-          operationDetails.get(op);
+          operationDetails.get(Operation.ALLOCATION);
       if (!assignDetails.isEmpty()) {
         return assignDetails.get(0).containerId;
       }
@@ -131,7 +131,7 @@ public class AssignmentInformation {
   }
 
   public ContainerId getFirstAllocatedOrReservedContainerId() {
-    ContainerId containerId;
+    ContainerId containerId = null;
     containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
     if (null != containerId) {
       return containerId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f474aad..4d563cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -95,7 +94,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * to hold the message if its app doesn't not get container from a node
    */
   private String appSkipNodeDiagnostics;
-  private CapacitySchedulerContext capacitySchedulerContext;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -140,30 +138,28 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
     
     containerAllocator = new ContainerAllocator(this, rc, rmContext);
-
-    if (scheduler instanceof CapacityScheduler) {
-      capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
-    }
   }
 
-  public synchronized boolean containerCompleted(RMContainer rmContainer,
+  synchronized public boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event,
       String partition) {
-    ContainerId containerId = rmContainer.getContainerId();
 
     // Remove from the list of containers
-    if (null == liveContainers.remove(containerId)) {
+    if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
-
+    
     // Remove from the list of newly allocated containers if found
     newlyAllocatedContainers.remove(rmContainer);
 
+    Container container = rmContainer.getContainer();
+    ContainerId containerId = container.getId();
+
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(containerId, containerStatus, event));
 
-    containersToPreempt.remove(containerId);
+    containersToPreempt.remove(rmContainer.getContainerId());
 
     RMAuditLogger.logSuccess(getUser(),
         AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
@@ -180,7 +176,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return true;
   }
 
-  public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+  synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
       Priority priority, ResourceRequest request, 
       Container container) {
 
@@ -204,9 +200,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     // Add it to allContainers list.
     newlyAllocatedContainers.add(rmContainer);
-
-    ContainerId containerId = container.getId();
-    liveContainers.put(containerId, rmContainer);
+    liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
@@ -219,17 +213,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     // Inform the container
     rmContainer.handle(
-        new RMContainerEvent(containerId, RMContainerEventType.START));
+        new RMContainerEvent(container.getId(), RMContainerEventType.START));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocate: applicationAttemptId=" 
-          + containerId.getApplicationAttemptId()
-          + " container=" + containerId + " host="
+          + container.getId().getApplicationAttemptId() 
+          + " container=" + container.getId() + " host="
           + container.getNodeId().getHost() + " type=" + type);
     }
     RMAuditLogger.logSuccess(getUser(),
         AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
-        getApplicationId(), containerId);
+        getApplicationId(), container.getId());
     
     return rmContainer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 1d0e78a..fe6db47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -18,29 +18,22 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
+
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 
 public class FiCaSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
-  private Map<ContainerId, RMContainer> killableContainers = new HashMap<>();
-  private Resource totalKillableResources = Resource.newInstance(0, 0);
   
   public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
       Set<String> nodeLabels) {
@@ -99,6 +92,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
   @Override
   public synchronized void unreserveResource(
       SchedulerApplicationAttempt application) {
+
     // adding NP checks as this can now be called for preemption
     if (getReservedContainer() != null
         && getReservedContainer().getContainer() != null
@@ -121,55 +115,4 @@ public class FiCaSchedulerNode extends SchedulerNode {
     }
     setReservedContainer(null);
   }
-
-  // According to decisions from preemption policy, mark the container to killable
-  public synchronized void markContainerToKillable(ContainerId containerId) {
-    RMContainer c = launchedContainers.get(containerId);
-    if (c != null && !killableContainers.containsKey(containerId)) {
-      killableContainers.put(containerId, c);
-      Resources.addTo(totalKillableResources, c.getAllocatedResource());
-    }
-  }
-
-  // According to decisions from preemption policy, mark the container to
-  // non-killable
-  public synchronized void markContainerToNonKillable(ContainerId containerId) {
-    RMContainer c = launchedContainers.get(containerId);
-    if (c != null && killableContainers.containsKey(containerId)) {
-      killableContainers.remove(containerId);
-      Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());
-    }
-  }
-
-  @Override
-  protected synchronized void updateResource(
-      Container container) {
-    super.updateResource(container);
-    if (killableContainers.containsKey(container.getId())) {
-      Resources.subtractFrom(totalKillableResources, container.getResource());
-      killableContainers.remove(container.getId());
-    }
-  }
-
-  @Override
-  protected synchronized void changeContainerResource(ContainerId containerId,
-      Resource deltaResource, boolean increase) {
-    super.changeContainerResource(containerId, deltaResource, increase);
-
-    if (killableContainers.containsKey(containerId)) {
-      if (increase) {
-        Resources.addTo(totalKillableResources, deltaResource);
-      } else {
-        Resources.subtractFrom(totalKillableResources, deltaResource);
-      }
-    }
-  }
-
-  public synchronized Resource getTotalKillableResources() {
-    return totalKillableResources;
-  }
-
-  public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
-    return killableContainers;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 35b7c14..9cf09e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -38,15 +38,10 @@ public enum SchedulerEventType {
   // Source: ContainerAllocationExpirer
   CONTAINER_EXPIRED,
 
-  /* Source: SchedulingEditPolicy */
+  // Source: SchedulingEditPolicy
   KILL_RESERVED_CONTAINER,
-
-  // Mark a container for preemption
-  MARK_CONTAINER_FOR_PREEMPTION,
-
-  // Mark a for-preemption container killable
-  MARK_CONTAINER_FOR_KILLABLE,
-
-  // Cancel a killable container
-  MARK_CONTAINER_FOR_NONKILLABLE
+  MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption
+                                 // in the near future
+  KILL_PREEMPTED_CONTAINER // Kill a container previously marked for
+                           // preemption
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index c944752..d9306dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -59,7 +59,7 @@ public class TestRMDispatcher {
       rmDispatcher.getEventHandler().handle(event1);
       ContainerPreemptEvent event2 =
           new ContainerPreemptEvent(appAttemptId, container,
-            SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE);
+            SchedulerEventType.KILL_PREEMPTED_CONTAINER);
       rmDispatcher.getEventHandler().handle(event2);
       ContainerPreemptEvent event3 =
           new ContainerPreemptEvent(appAttemptId, container,
@@ -70,7 +70,7 @@ public class TestRMDispatcher {
       verify(sched, times(3)).handle(any(SchedulerEvent.class));
       verify(sched).killReservedContainer(container);
       verify(sched).markContainerForPreemption(appAttemptId, container);
-      verify(sched).markContainerForKillable(container);
+      verify(sched).killPreemptedContainer(container);
     } catch (InterruptedException e) {
       Assert.fail();
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 3057615..028afb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -2352,7 +2352,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
           .get(app0.getApplicationId()).getCurrentAppAttempt();
       // kill app0-attempt
-      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(
+      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(
           app0.getCurrentAppAttempt().getMasterContainer().getId()));
       am0.waitForState(RMAppAttemptState.FAILED);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 16f3f60..5035afe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
@@ -565,7 +566,7 @@ public class TestAMRestart {
     ContainerId amContainer =
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
     // Preempt the first attempt;
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+    scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
 
     am1.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
@@ -581,7 +582,7 @@ public class TestAMRestart {
     // Preempt the second attempt.
     ContainerId amContainer2 =
         ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
+    scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2));
 
     am2.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
@@ -676,7 +677,7 @@ public class TestAMRestart {
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
 
     // Forcibly preempt the am container;
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+    scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
 
     am1.waitForState(RMAppAttemptState.FAILED);
     Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index e9129de..13f267d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -168,7 +167,6 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mCS.getConfiguration()).thenReturn(schedConf);
     rmContext = mock(RMContext.class);
     when(mCS.getRMContext()).thenReturn(rmContext);
-    when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
     Dispatcher disp = mock(Dispatcher.class);
@@ -291,7 +289,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
     for (ContainerPreemptEvent e : events.subList(20, 20)) {
       assertEquals(appC, e.getAppId());
-      assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
+      assertEquals(KILL_PREEMPTED_CONTAINER, e.getType());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 21ea495..512f37c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -124,7 +123,6 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     mClock = mock(Clock.class);
     cs = mock(CapacityScheduler.class);
     when(cs.getResourceCalculator()).thenReturn(rc);
-    when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
 
     nlm = mock(RMNodeLabelsManager.class);
     mDisp = mock(EventHandler.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 171196f..0b32676 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -265,7 +264,6 @@ public class TestApplicationLimits {
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
-    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index d8161f8..1569a12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -205,7 +205,7 @@ public class TestApplicationPriority {
       if (++counter > 2) {
         break;
       }
-      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
+      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
     }
 
     // check node report, 12 GB used and 4 GB available
@@ -512,7 +512,7 @@ public class TestApplicationPriority {
       if (++counter > 2) {
         break;
       }
-      cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+      cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
       iterator.remove();
     }
 
@@ -542,7 +542,7 @@ public class TestApplicationPriority {
       if (++counter > 1) {
         break;
       }
-      cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+      cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
       iterator.remove();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 16ba607..b6c005b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -1188,7 +1188,7 @@ public class TestCapacityScheduler {
 
     // kill the 3 containers
     for (Container c : allocatedContainers) {
-      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
+      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
     }
 
     // check values
@@ -1197,7 +1197,7 @@ public class TestCapacityScheduler {
         Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
 
     // kill app0-attempt0 AM container
-    cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
+    cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0
         .getCurrentAppAttempt().getMasterContainer().getId()));
 
     // wait for app0 failed
@@ -1220,7 +1220,7 @@ public class TestCapacityScheduler {
     allocatedContainers =
         am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
     for (Container c : allocatedContainers) {
-      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
+      cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
     }
 
     // check values
@@ -1269,7 +1269,7 @@ public class TestCapacityScheduler {
     }
 
     // Call killContainer to preempt the container
-    cs.markContainerForKillable(rmContainer);
+    cs.killPreemptedContainer(rmContainer);
 
     Assert.assertEquals(3, requests.size());
     for (ResourceRequest request : requests) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
deleted file mode 100644
index bea7797..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
+++ /dev/null
@@ -1,677 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestCapacitySchedulerPreemption {
-  private static final Log LOG = LogFactory.getLog(
-      TestCapacitySchedulerPreemption.class);
-
-  private final int GB = 1024;
-
-  private Configuration conf;
-
-  RMNodeLabelsManager mgr;
-
-  Clock clock;
-
-  @Before
-  public void setUp() throws Exception {
-    conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
-        ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
-    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
-
-    // Set preemption related configurations
-    conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
-        0);
-    conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
-        true);
-    conf.setFloat(
-        ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
-    conf.setFloat(
-        ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
-    mgr = new NullRMNodeLabelsManager();
-    mgr.init(this.conf);
-    clock = mock(Clock.class);
-    when(clock.getTime()).thenReturn(0L);
-  }
-
-  private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
-    RMActiveServices activeServices = rm.getRMActiveService();
-    SchedulingMonitor mon = null;
-    for (Service service : activeServices.getServices()) {
-      if (service instanceof SchedulingMonitor) {
-        mon = (SchedulingMonitor) service;
-        break;
-      }
-    }
-
-    if (mon != null) {
-      return mon.getSchedulingEditPolicy();
-    }
-    return null;
-  }
-
-  @Test (timeout = 60000)
-  public void testSimplePreemption() throws Exception {
-    /**
-     * Test case: Submit two application (app1/app2) to different queues, queue
-     * structure:
-     *
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     *
-     * 1) Two nodes in the cluster, each of them has 4G.
-     *
-     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
-     * more resource available.
-     *
-     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
-     *
-     * Now the cluster is fulfilled.
-     *
-     * 4) app2 asks for another 1G container, system will preempt one container
-     * from app1, and app2 will receive the preempted container
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(1, killableContainers.size());
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 6 containers, and app2 has 2 containers
-    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersNodeLocalityDelay()
-      throws Exception {
-    /**
-     * Test case: same as testSimplePreemption steps 1-3.
-     *
-     * Step 4: app2 asks for 1G container with locality specified, so it needs
-     * to wait for missed-opportunity before get scheduled.
-     * Check if system waits missed-opportunity before finish killable container
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container with unknown host and unknown rack
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1), ResourceRequest
-        .newInstance(Priority.newInstance(1), "unknownhost",
-            Resources.createResource(1 * GB), 1), ResourceRequest
-        .newInstance(Priority.newInstance(1), "/default-rack",
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    // Do allocation again, one container will be preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    // App1 has 6 containers, and app2 has 2 containers (new container allocated)
-    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersHardNodeLocality()
-      throws Exception {
-    /**
-     * Test case: same as testSimplePreemption steps 1-3.
-     *
-     * Step 4: app2 asks for 1G container with hard locality specified, and
-     *         asked host is not existed
-     * Confirm system doesn't preempt any container.
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container for h3 with hard locality,
-    // h3 doesn't exist in the cluster
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1, true), ResourceRequest
-        .newInstance(Priority.newInstance(1), "h3",
-            Resources.createResource(1 * GB), 1, false), ResourceRequest
-        .newInstance(Priority.newInstance(1), "/default-rack",
-            Resources.createResource(1 * GB), 1, false)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    // Do allocation again, nothing will be preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    // App1 has 7 containers, and app2 has 1 containers (no container allocated)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
-      throws Exception {
-    /**
-     * Test case:
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     * Submit applications to two queues, one uses more than the other, so
-     * preemption will happen.
-     *
-     * Check:
-     * 1) Killable containers resources will be excluded from PCPP (no duplicated
-     *    container added to killable list)
-     * 2) When more resources need to be preempted, new containers will be selected
-     *    and killable containers will be considered
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 6 times for node1
-    for (int i = 0; i < 6; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
-    // NM1 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
-
-    // Get edit policy and do one update
-    ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-
-    // Check killable containers and to-be-preempted containers in edit policy
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Run edit schedule again, confirm status doesn't changed
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Save current to kill containers
-    Set<ContainerId> previousKillableContainers = new HashSet<>(
-        pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
-            .keySet());
-
-    // Update request resource of c from 1 to 2, so we need to preempt
-    // one more container
-    am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
-
-    // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
-    // and 1 container in killable map
-    editPolicy.editSchedule();
-    Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
-
-    // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Check if previous killable containers included by new killable containers
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-    Assert.assertTrue(
-        Sets.difference(previousKillableContainers, killableContainers.keySet())
-            .isEmpty());
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
-      throws Exception {
-    /**
-     * Test case:
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     * Submit applications to two queues, one uses more than the other, so
-     * preemption will happen.
-     *
-     * Check:
-     * 1) Containers will be marked to killable
-     * 2) Cancel resource request
-     * 3) Killable containers will be cancelled from policy and scheduler
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 6 times for node1
-    for (int i = 0; i < 6; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
-    // NM1 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
-
-    // Get edit policy and do one update
-    ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if 3 container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
-
-    // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
-    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-
-    // Call editSchedule once more to make sure still nothing happens
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersUserLimit()
-      throws Exception {
-    /**
-     * Test case: Submit two application (app1/app2) to different queues, queue
-     * structure:
-     *
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     *
-     * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
-     *
-     * 1) Two nodes in the cluster, each of them has 4G.
-     *
-     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
-     * more resource available.
-     *
-     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
-     *
-     * Now the cluster is fulfilled.
-     *
-     * 4) app2 asks for another 1G container, system will preempt one container
-     * from app1, and app2 will receive the preempted container
-     */
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
-    csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
-    MockRM rm1 = new MockRM(csConf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if no container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    // No preemption happens
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
-    Assert.assertEquals(0, killableContainers.size());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  private Map<ContainerId, RMContainer> waitKillableContainersSize(
-      PreemptionManager pm, String queueName, String partition,
-      int expectedSize) throws InterruptedException {
-    Map<ContainerId, RMContainer> killableContainers =
-        pm.getKillableContainersMap(queueName, partition);
-
-    int wait = 0;
-    // Wait for at most 5 sec (it should be super fast actually)
-    while (expectedSize != killableContainers.size() && wait < 500) {
-      killableContainers = pm.getKillableContainersMap(queueName, partition);
-      Thread.sleep(10);
-      wait++;
-    }
-
-    Assert.assertEquals(expectedSize, killableContainers.size());
-    return killableContainers;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 1612201..5169337 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -100,7 +99,6 @@ public class TestChildQueueOrder {
     when(csContext.getResourceCalculator()).
     thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);
-    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
   }
 
   private FiCaSchedulerApp getMockApplication(int appId, String user) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 87a3d51..69b0813 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -151,7 +150,6 @@ public class TestLeafQueue {
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
-    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getRMContext()).thenReturn(rmContext);
     RMContainerTokenSecretManager containerTokenSecretManager =
         new RMContainerTokenSecretManager(conf);
@@ -3094,7 +3092,6 @@ public class TestLeafQueue {
         Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(2 * GB, 2));
-    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     return csContext;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 1ee201d..bbf6e43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -1677,100 +1676,4 @@ public class TestNodeLabelContainerAllocation {
     checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
         cs.getApplicationAttempt(am1.getApplicationAttemptId()));
   }
-
-  @Test
-  public void testParentQueueMaxCapsAreRespected() throws Exception {
-    /*
-     * Queue tree:
-     *          Root
-     *        /     \
-     *       A       B
-     *      / \
-     *     A1 A2
-     *
-     * A has 50% capacity and 50% max capacity (of label=x)
-     * A1/A2 has 50% capacity and 100% max capacity (of label=x)
-     * Cluster has one node (label=x) with resource = 24G.
-     * So we can at most use 12G resources under queueA.
-     */
-    CapacitySchedulerConfiguration csConf =
-        new CapacitySchedulerConfiguration(this.conf);
-
-    // Define top-level queues
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
-        "b"});
-    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
-
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    csConf.setCapacity(A, 10);
-    csConf.setAccessibleNodeLabels(A, toSet("x"));
-    csConf.setCapacityByLabel(A, "x", 50);
-    csConf.setMaximumCapacityByLabel(A, "x", 50);
-
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    csConf.setCapacity(B, 90);
-    csConf.setAccessibleNodeLabels(B, toSet("x"));
-    csConf.setCapacityByLabel(B, "x", 50);
-    csConf.setMaximumCapacityByLabel(B, "x", 50);
-
-    // Define 2nd-level queues
-    csConf.setQueues(A, new String[] { "a1",
-        "a2"});
-
-    final String A1 = A + ".a1";
-    csConf.setCapacity(A1, 50);
-    csConf.setAccessibleNodeLabels(A1, toSet("x"));
-    csConf.setCapacityByLabel(A1, "x", 50);
-    csConf.setMaximumCapacityByLabel(A1, "x", 100);
-    csConf.setUserLimitFactor(A1, 100.0f);
-
-    final String A2 = A + ".a2";
-    csConf.setCapacity(A2, 50);
-    csConf.setAccessibleNodeLabels(A2, toSet("x"));
-    csConf.setCapacityByLabel(A2, "x", 50);
-    csConf.setMaximumCapacityByLabel(A2, "x", 100);
-    csConf.setUserLimitFactor(A2, 100.0f);
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of(
-        NodeLabel.newInstance("x", false)));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
-
-    // inject node label manager
-    MockRM rm = new MockRM(csConf) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm.getRMContext().setNodeLabelManager(mgr);
-    rm.start();
-
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-
-    MockNM nm1 =
-        new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
-    nm1.registerNode();
-
-    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
-    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
-    am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
-    doNMHeartbeat(rm, nm1.getNodeId(), 10);
-    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
-        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
-
-    // Try to launch app2 in a2, asked 2GB, should success
-    RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
-
-    // am2 asks more resources, cannot success because current used = 9G (app1)
-    // + 2G (app2) = 11G, and queue's max capacity = 12G
-    am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");
-
-    doNMHeartbeat(rm, nm1.getNodeId(), 10);
-    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
-        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 23dc860..f73baa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -93,7 +92,6 @@ public class TestParentQueue {
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getNonPartitionedQueueComparator()).
     thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
-    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getResourceCalculator()).
     thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 56facee..2ef5e39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -127,7 +126,6 @@ public class TestReservations {
     when(csContext.getNonPartitionedQueueComparator()).thenReturn(
         CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getRMContext()).thenReturn(rmContext);
     RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
         conf);