You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/09 20:39:39 UTC

[1/4] hadoop git commit: YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 bced9e2e5 -> 40367c8da


http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
new file mode 100644
index 0000000..5067412
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestCapacitySchedulerSurgicalPreemption
+    extends CapacitySchedulerPreemptionTestBase {
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+        true);
+  }
+
+  @Test(timeout = 60000)
+  public void testSimpleSurgicalPreemption()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+     *
+     * 2) app1 submit to queue-a first, it asked 32 * 1G containers
+     * We will allocate 16 on n1 and 16 on n2.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * 4) app2 asks for another 6G container, it will be reserved on n1
+     *
+     * Now: we have:
+     * n1: 17 from app1, 1 from app2, and 1 reserved from app2
+     * n2: 16 from app1.
+     *
+     * After preemption, we should expect:
+     * Preempt 4 containers from app1 on n1.
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 32, new ArrayList<ContainerId>());
+
+    // Do allocation for node1/node2
+    for (int i = 0; i < 32; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 33 containers now
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(33, schedulerApp1.getLiveContainers().size());
+    // 17 from n1 and 16 from n2
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+        am1.getApplicationAttemptId(), 17);
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+        am1.getApplicationAttemptId(), 16);
+
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1/NM2 has available resource = 2G/4G
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    Assert.assertEquals(4 * GB, cs.getNode(nm2.getNodeId())
+        .getAvailableResource().getMemorySize());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(6 * GB), 1)), null);
+
+    // Call allocation once on n1, we should expect the container reserved on n1
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if 4 containers from app1 at n1 killed
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 29);
+
+    // 13 from n1 (4 preempted) and 16 from n2
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+        am1.getApplicationAttemptId(), 13);
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+        am1.getApplicationAttemptId(), 16);
+
+    rm1.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testSurgicalPreemptionWithAvailableResource()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+     *
+     * 2) app1 submit to queue-a first, it asked 38 * 1G containers
+     * We will allocate 20 on n1 and 19 on n2.
+     *
+     * 3) app2 submit to queue-c, ask for one 4G container (for AM)
+     *
+     * After preemption, we should expect:
+     * Preempt 3 containers from app1 and AM of app2 successfully allocated.
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
+
+    // Do allocation for node1/node2
+    for (int i = 0; i < 38; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 31 containers now
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
+    // 17 from n1 and 16 from n2
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+        am1.getApplicationAttemptId(), 20);
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+        am1.getApplicationAttemptId(), 19);
+
+
+    // Submit app2 to queue-c and asks for a 4G container for AM
+    RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "c");
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    // Call editSchedule: containers are selected to be preemption candidate
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    Assert.assertEquals(3, editPolicy.getToPreemptContainers().size());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 36);
+
+    // Call allocation, containers are reserved
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+    // Call editSchedule twice and allocation once, container should get allocated
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    int tick = 0;
+    while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      tick++;
+      Thread.sleep(100);
+    }
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
+
+    rm1.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 0d09e8f..10f6c2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -5329,7 +5329,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
           resourceManager
               .getResourceScheduler()
               .getSchedulerNode(resourceEvent.getNodeId())
-              .setTotalResource(resourceEvent.getResourceOption().getResource());
+              .updateTotalResource(resourceEvent.getResourceOption().getResource());
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 3167726..d9ab823 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -1202,7 +1202,7 @@ public class TestFifoScheduler {
           resourceManager
               .getResourceScheduler()
               .getSchedulerNode(resourceEvent.getNodeId())
-              .setTotalResource(resourceEvent.getResourceOption().getResource());
+              .updateTotalResource(resourceEvent.getResourceOption().getResource());
         }
       }
     });


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/4] hadoop git commit: YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan

Posted by ep...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..e60c384
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,687 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ProportionalCapacityPreemptionPolicyMockFramework {
+  static final Log LOG =
+      LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
+  final String ROOT = CapacitySchedulerConfiguration.ROOT;
+
+  Map<String, CSQueue> nameToCSQueues = null;
+  Map<String, Resource> partitionToResource = null;
+  Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = null;
+  RMNodeLabelsManager nlm = null;
+  RMContext rmContext = null;
+
+  ResourceCalculator rc = new DefaultResourceCalculator();
+  Clock mClock = null;
+  CapacitySchedulerConfiguration conf = null;
+  CapacityScheduler cs = null;
+  EventHandler<SchedulerEvent> mDisp = null;
+  ProportionalCapacityPreemptionPolicy policy = null;
+  Resource clusterResource = null;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup() {
+    org.apache.log4j.Logger.getRootLogger().setLevel(
+        org.apache.log4j.Level.DEBUG);
+
+    conf = new CapacitySchedulerConfiguration(new Configuration(false));
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        3000);
+    // report "ideal" preempt
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 1.0);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        (float) 1.0);
+
+    mClock = mock(Clock.class);
+    cs = mock(CapacityScheduler.class);
+    when(cs.getResourceCalculator()).thenReturn(rc);
+    when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
+    when(cs.getConfiguration()).thenReturn(conf);
+
+    nlm = mock(RMNodeLabelsManager.class);
+    mDisp = mock(EventHandler.class);
+
+    rmContext = mock(RMContext.class);
+    when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+    Dispatcher disp = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(disp);
+    when(disp.getEventHandler()).thenReturn(mDisp);
+    when(cs.getRMContext()).thenReturn(rmContext);
+
+    partitionToResource = new HashMap<>();
+    nodeIdToSchedulerNodes = new HashMap<>();
+    nameToCSQueues = new HashMap<>();
+  }
+
+  public void buildEnv(String labelsConfig, String nodesConfig,
+      String queuesConfig, String appsConfig) throws IOException {
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
+  }
+
+  public void buildEnv(String labelsConfig, String nodesConfig,
+      String queuesConfig, String appsConfig,
+      boolean useDominantResourceCalculator) throws IOException {
+    if (useDominantResourceCalculator) {
+      when(cs.getResourceCalculator()).thenReturn(
+          new DominantResourceCalculator());
+    }
+    mockNodeLabelsManager(labelsConfig);
+    mockSchedulerNodes(nodesConfig);
+    for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
+      when(cs.getSchedulerNode(nodeId)).thenReturn(
+          nodeIdToSchedulerNodes.get(nodeId));
+    }
+    when(cs.getAllNodes()).thenReturn(nodeIdToSchedulerNodes);
+    ParentQueue root = mockQueueHierarchy(queuesConfig);
+    when(cs.getRootQueue()).thenReturn(root);
+    when(cs.getClusterResource()).thenReturn(clusterResource);
+    mockApplications(appsConfig);
+
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
+        mClock);
+  }
+
+  private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
+      String queueName, List<RMContainer> reservedContainers,
+      List<RMContainer> liveContainers) {
+    int containerId = 1;
+    int start = containersConfig.indexOf("=") + 1;
+    int end = -1;
+
+    while (start < containersConfig.length()) {
+      while (start < containersConfig.length()
+          && containersConfig.charAt(start) != '(') {
+        start++;
+      }
+      if (start >= containersConfig.length()) {
+        throw new IllegalArgumentException(
+            "Error containers specification, line=" + containersConfig);
+      }
+      end = start + 1;
+      while (end < containersConfig.length()
+          && containersConfig.charAt(end) != ')') {
+        end++;
+      }
+      if (end >= containersConfig.length()) {
+        throw new IllegalArgumentException(
+            "Error containers specification, line=" + containersConfig);
+      }
+
+      // now we found start/end, get container values
+      String[] values = containersConfig.substring(start + 1, end).split(",");
+      if (values.length != 6) {
+        throw new IllegalArgumentException("Format to define container is:"
+            + "(priority,resource,host,expression,repeat,reserved)");
+      }
+      Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+      Resource res = parseResourceFromString(values[1]);
+      NodeId host = NodeId.newInstance(values[2], 1);
+      String exp = values[3];
+      int repeat = Integer.valueOf(values[4]);
+      boolean reserved = Boolean.valueOf(values[5]);
+
+      for (int i = 0; i < repeat; i++) {
+        Container c = mock(Container.class);
+        when(c.getResource()).thenReturn(res);
+        when(c.getPriority()).thenReturn(pri);
+        RMContainerImpl rmc = mock(RMContainerImpl.class);
+        when(rmc.getAllocatedNode()).thenReturn(host);
+        when(rmc.getNodeLabelExpression()).thenReturn(exp);
+        when(rmc.getAllocatedResource()).thenReturn(res);
+        when(rmc.getContainer()).thenReturn(c);
+        when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
+        when(rmc.getQueueName()).thenReturn(queueName);
+        final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
+        when(rmc.getContainerId()).thenReturn(
+            cId);
+        doAnswer(new Answer<Integer>() {
+          @Override
+          public Integer answer(InvocationOnMock invocation) throws Throwable {
+            return cId.compareTo(((RMContainer) invocation.getArguments()[0])
+                .getContainerId());
+          }
+        }).when(rmc).compareTo(any(RMContainer.class));
+
+        if (containerId == 1) {
+          when(rmc.isAMContainer()).thenReturn(true);
+        }
+
+        if (reserved) {
+          reservedContainers.add(rmc);
+          when(rmc.getReservedResource()).thenReturn(res);
+        } else {
+          liveContainers.add(rmc);
+        }
+
+        // Add container to scheduler-node
+        addContainerToSchedulerNode(host, rmc, reserved);
+
+        // If this is a non-exclusive allocation
+        String partition = null;
+        if (exp.isEmpty()
+            && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
+            .isEmpty()) {
+          LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
+              queue.getIgnoreExclusivityRMContainers();
+          if (!ignoreExclusivityContainers.containsKey(partition)) {
+            ignoreExclusivityContainers.put(partition,
+                new TreeSet<RMContainer>());
+          }
+          ignoreExclusivityContainers.get(partition).add(rmc);
+        }
+        LOG.debug("add container to app=" + attemptId + " res=" + res
+            + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+            + partition);
+
+        containerId++;
+      }
+
+      start = end + 1;
+    }
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * queueName\t  // app1
+   * (priority,resource,host,expression,#repeat,reserved)
+   * (priority,resource,host,expression,#repeat,reserved);
+   * queueName\t  // app2
+   * </pre>
+   */
+  private void mockApplications(String appsConfig) {
+    int id = 1;
+    for (String a : appsConfig.split(";")) {
+      String[] strs = a.split("\t");
+      String queueName = strs[0];
+
+      // get containers
+      List<RMContainer> liveContainers = new ArrayList<RMContainer>();
+      List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+      ApplicationId appId = ApplicationId.newInstance(0L, id);
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+      mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+          liveContainers);
+
+      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+      when(app.getLiveContainers()).thenReturn(liveContainers);
+      when(app.getReservedContainers()).thenReturn(reservedContainers);
+      when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+      when(app.getApplicationId()).thenReturn(appId);
+      when(app.getPriority()).thenReturn(Priority.newInstance(0));
+
+      // add to LeafQueue
+      LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+      queue.getApplications().add(app);
+
+      id++;
+    }
+  }
+
+  private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
+      boolean isReserved) {
+    SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId);
+    assert node != null;
+
+    if (isReserved) {
+      when(node.getReservedContainer()).thenReturn(container);
+    } else {
+      node.getCopiedListOfRunningContainers().add(container);
+      Resources.subtractFrom(node.getAvailableResource(),
+          container.getAllocatedResource());
+    }
+  }
+
+  /**
+   * Format is:
+   * host1=partition[ res=resource];
+   * host2=partition[ res=resource];
+   */
+  private void mockSchedulerNodes(String schedulerNodesConfigStr)
+      throws IOException {
+    String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
+    for (String p : nodesConfigStrArray) {
+      String[] arr = p.split(" ");
+
+      NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
+      String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length());
+
+      FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
+      when(sn.getNodeID()).thenReturn(nodeId);
+      when(sn.getPartition()).thenReturn(partition);
+
+      Resource totalRes = Resources.createResource(0);
+      if (arr.length > 1) {
+        String res = arr[1];
+        if (res.contains("res=")) {
+          String resSring = res.substring(
+              res.indexOf("res=") + "res=".length());
+          totalRes = parseResourceFromString(resSring);
+        }
+      }
+      when(sn.getTotalResource()).thenReturn(totalRes);
+      when(sn.getAvailableResource()).thenReturn(Resources.clone(totalRes));
+
+      // TODO, add settings of killable resources when necessary
+      when(sn.getTotalKillableResources()).thenReturn(Resources.none());
+
+      List<RMContainer> liveContainers = new ArrayList<>();
+      when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
+
+      nodeIdToSchedulerNodes.put(nodeId, sn);
+
+      LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
+    }
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * partition0=total_resource,exclusivity;
+   * partition1=total_resource,exclusivity;
+   * ...
+   * </pre>
+   */
+  private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
+    String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
+    clusterResource = Resources.createResource(0);
+    for (String p : partitionConfigArr) {
+      String partitionName = p.substring(0, p.indexOf("="));
+      Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
+          p.indexOf(",")));
+      boolean exclusivity =
+          Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
+      when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
+          .thenReturn(res);
+      when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
+
+      // add to partition to resource
+      partitionToResource.put(partitionName, res);
+      LOG.debug("add partition=" + partitionName + " totalRes=" + res
+          + " exclusivity=" + exclusivity);
+      Resources.addTo(clusterResource, res);
+    }
+
+    when(nlm.getClusterNodeLabelNames()).thenReturn(
+        partitionToResource.keySet());
+  }
+
+  private Resource parseResourceFromString(String p) {
+    String[] resource = p.split(":");
+    Resource res;
+    if (resource.length == 1) {
+      res = Resources.createResource(Integer.valueOf(resource[0]));
+    } else {
+      res = Resources.createResource(Integer.valueOf(resource[0]),
+          Integer.valueOf(resource[1]));
+    }
+    return res;
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
+   * -A(...);
+   * --A1(...);
+   * --A2(...);
+   * -B...
+   * </pre>
+   * ";" splits queues, and there should no empty lines, no extra spaces
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private ParentQueue mockQueueHierarchy(String queueExprs) {
+    String[] queueExprArray = queueExprs.split(";");
+    ParentQueue rootQueue = null;
+    for (int idx = 0; idx < queueExprArray.length; idx++) {
+      String q = queueExprArray[idx];
+      CSQueue queue;
+
+      // Initialize queue
+      if (isParent(queueExprArray, idx)) {
+        ParentQueue parentQueue = mock(ParentQueue.class);
+        queue = parentQueue;
+        List<CSQueue> children = new ArrayList<CSQueue>();
+        when(parentQueue.getChildQueues()).thenReturn(children);
+      } else {
+        LeafQueue leafQueue = mock(LeafQueue.class);
+        final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
+            new Comparator<FiCaSchedulerApp>() {
+              @Override
+              public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+                return a1.getApplicationId().compareTo(a2.getApplicationId());
+              }
+            });
+        when(leafQueue.getApplications()).thenReturn(apps);
+        OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+        when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+          public Object answer(InvocationOnMock invocation) {
+            return apps.descendingIterator();
+          }
+        });
+        when(leafQueue.getOrderingPolicy()).thenReturn(so);
+
+        Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
+            new HashMap<>();
+        when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
+            ignorePartitionContainers);
+        queue = leafQueue;
+      }
+
+      setupQueue(queue, q, queueExprArray, idx);
+      if (queue.getQueueName().equals(ROOT)) {
+        rootQueue = (ParentQueue) queue;
+      }
+    }
+    return rootQueue;
+  }
+
+  private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
+      int idx) {
+    LOG.debug("*** Setup queue, source=" + q);
+    String queuePath = null;
+
+    int myLevel = getLevel(q);
+    if (0 == myLevel) {
+      // It's root
+      when(queue.getQueueName()).thenReturn(ROOT);
+      queuePath = ROOT;
+    }
+
+    String queueName = getQueueName(q);
+    when(queue.getQueueName()).thenReturn(queueName);
+
+    // Setup parent queue, and add myself to parentQueue.children-list
+    ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
+    if (null != parentQueue) {
+      when(queue.getParent()).thenReturn(parentQueue);
+      parentQueue.getChildQueues().add(queue);
+
+      // Setup my path
+      queuePath = parentQueue.getQueuePath() + "." + queueName;
+    }
+    when(queue.getQueuePath()).thenReturn(queuePath);
+
+    QueueCapacities qc = new QueueCapacities(0 == myLevel);
+    ResourceUsage ru = new ResourceUsage();
+
+    when(queue.getQueueCapacities()).thenReturn(qc);
+    when(queue.getQueueResourceUsage()).thenReturn(ru);
+
+    LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
+        + queue.getQueuePath());
+    LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
+        .getQueueName()));
+
+    // Setup other fields like used resource, guaranteed resource, etc.
+    String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
+    for (String s : capacitySettingStr.split(",")) {
+      String partitionName = s.substring(0, s.indexOf("="));
+      String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
+      // Add a small epsilon to capacities to avoid truncate when doing
+      // Resources.multiply
+      float epsilon = 1e-6f;
+      Resource totResoucePerPartition = partitionToResource.get(partitionName);
+      float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[0].trim()), totResoucePerPartition)
+          + epsilon;
+      float absMax = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[1].trim()), totResoucePerPartition)
+          + epsilon;
+      float absUsed = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[2].trim()), totResoucePerPartition)
+          + epsilon;
+      Resource pending = parseResourceFromString(values[3].trim());
+      qc.setAbsoluteCapacity(partitionName, absGuaranteed);
+      qc.setAbsoluteMaximumCapacity(partitionName, absMax);
+      qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+      ru.setPending(partitionName, pending);
+      if (!isParent(queueExprArray, idx)) {
+        LeafQueue lq = (LeafQueue) queue;
+        when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
+            isA(String.class))).thenReturn(pending);
+      }
+      ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
+
+      // Setup reserved resource if it contained by input config
+      Resource reserved = Resources.none();
+      if(values.length == 5) {
+        reserved = parseResourceFromString(values[4].trim());
+        ru.setReserved(partitionName, reserved);
+      }
+      LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+          + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+          + ",abs_used" + absUsed + ",pending_resource=" + pending
+          + ", reserved_resource=" + reserved + "]");
+    }
+
+    // Setup preemption disabled
+    when(queue.getPreemptionDisabled()).thenReturn(
+        conf.getPreemptionDisabled(queuePath, false));
+
+    nameToCSQueues.put(queueName, queue);
+    when(cs.getQueue(eq(queueName))).thenReturn(queue);
+  }
+
+  /**
+   * Level of a queue is how many "-" at beginning, root's level is 0
+   */
+  private int getLevel(String q) {
+    int level = 0; // level = how many "-" at beginning
+    while (level < q.length() && q.charAt(level) == '-') {
+      level++;
+    }
+    return level;
+  }
+
+  private String getQueueName(String q) {
+    int idx = 0;
+    // find first != '-' char
+    while (idx < q.length() && q.charAt(idx) == '-') {
+      idx++;
+    }
+    if (idx == q.length()) {
+      throw new IllegalArgumentException("illegal input:" + q);
+    }
+    // name = after '-' and before '('
+    String name = q.substring(idx, q.indexOf('('));
+    if (name.isEmpty()) {
+      throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
+    }
+    if (name.contains(".")) {
+      throw new IllegalArgumentException("queue name shouldn't contain '.':"
+          + name);
+    }
+    return name;
+  }
+
+  private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
+    idx--;
+    while (idx >= 0) {
+      int level = getLevel(queueExprArray[idx]);
+      if (level < myLevel) {
+        String parentQueuName = getQueueName(queueExprArray[idx]);
+        return (ParentQueue) nameToCSQueues.get(parentQueuName);
+      }
+      idx--;
+    }
+
+    return null;
+  }
+
+  /**
+   * Get if a queue is ParentQueue
+   */
+  private boolean isParent(String[] queues, int idx) {
+    int myLevel = getLevel(queues[idx]);
+    idx++;
+    while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
+      idx++;
+    }
+    if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
+      // It's a LeafQueue
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  public ApplicationAttemptId getAppAttemptId(int id) {
+    ApplicationId appId = ApplicationId.newInstance(0L, id);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    return appAttemptId;
+  }
+
+  public void checkContainerNodesInApp(FiCaSchedulerApp app,
+      int expectedContainersNumber, String host) {
+    NodeId nodeId = NodeId.newInstance(host, 1);
+    int num = 0;
+    for (RMContainer c : app.getLiveContainers()) {
+      if (c.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    for (RMContainer c : app.getReservedContainers()) {
+      if (c.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    Assert.assertEquals(expectedContainersNumber, num);
+  }
+
+  public FiCaSchedulerApp getApp(String queueName, int appId) {
+    for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
+        .getApplications()) {
+      if (app.getApplicationId().getId() == appId) {
+        return app;
+      }
+    }
+    return null;
+  }
+
+  public void checkAbsCapacities(CSQueue queue, String partition,
+      float guaranteed, float max, float used) {
+    QueueCapacities qc = queue.getQueueCapacities();
+    Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
+    Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
+    Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
+  }
+
+  public void checkPendingResource(CSQueue queue, String partition, int pending) {
+    ResourceUsage ru = queue.getQueueResourceUsage();
+    Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
+  }
+
+  public void checkReservedResource(CSQueue queue, String partition, int reserved) {
+    ResourceUsage ru = queue.getQueueResourceUsage();
+    Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
+  }
+
+  static class IsPreemptionRequestForQueueAndNode
+      extends ArgumentMatcher<ContainerPreemptEvent> {
+    private final ApplicationAttemptId appAttId;
+    private final String queueName;
+    private final NodeId nodeId;
+
+    IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
+        String queueName, NodeId nodeId) {
+      this.appAttId = appAttId;
+      this.queueName = queueName;
+      this.nodeId = nodeId;
+    }
+    @Override
+    public boolean matches(Object o) {
+      ContainerPreemptEvent cpe = (ContainerPreemptEvent)o;
+
+      return appAttId.equals(cpe.getAppId())
+          && queueName.equals(cpe.getContainer().getQueueName())
+          && nodeId.equals(cpe.getContainer().getAllocatedNode());
+    }
+    @Override
+    public String toString() {
+      return appAttId.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index f4fcfb9..13167b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -62,6 +62,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
@@ -161,6 +162,11 @@ public class TestProportionalCapacityPreemptionPolicy {
     mCS = mock(CapacityScheduler.class);
     when(mCS.getResourceCalculator()).thenReturn(rc);
     lm = mock(RMNodeLabelsManager.class);
+    try {
+      when(lm.isExclusiveNodeLabel(anyString())).thenReturn(true);
+    } catch (IOException e) {
+      // do nothing
+    }
     when(mCS.getConfiguration()).thenReturn(conf);
     rmContext = mock(RMContext.class);
     when(mCS.getRMContext()).thenReturn(rmContext);
@@ -650,6 +656,26 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   @Test
+  public void testHierarchicalWithReserved() {
+    int[][] qData = new int[][] {
+        //  /    A   B   C    D   E   F
+        { 200, 100, 50, 50, 100, 10, 90 },  // abs
+        { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
+        { 200, 110, 60, 50,  90, 90,  0 },  // used
+        {  10,   0,  0,  0,  10,  0, 10 },  // pending
+        {  40,  25,  15, 10, 15,  15,  0 },  // reserved
+        {   4,   2,  1,  1,   2,  1,  1 },  // apps
+        {  -1,  -1,  1,  1,  -1,  1,  1 },  // req granularity
+        {   2,   2,  0,  0,   2,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify capacity taken from A1, not B1 despite B1 being far over
+    // its absolute guaranteed capacity
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
+  @Test
   public void testZeroGuar() {
     int[][] qData = new int[][] {
       //  /    A   B   C    D   E   F
@@ -938,12 +964,42 @@ public class TestProportionalCapacityPreemptionPolicy {
         "queueA").get("");
     assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
     long extraForQueueA =
-        tempQueueAPartition.current.getMemorySize() - tempQueueAPartition.guaranteed
+        tempQueueAPartition.getUsed().getMemorySize() - tempQueueAPartition.getGuaranteed()
             .getMemorySize();
     assertEquals(extraForQueueA,
         tempQueueAPartition.preemptableExtra.getMemorySize());
   }
 
+  @Test
+  public void testHierarchicalLarge3LevelsWithReserved() {
+    int[][] qData = new int[][] {
+        //  /    A                      F               I
+        //            B    C                  G    H          J    K
+        //                    D    E
+        { 400, 200,  60, 140, 100, 40,  100,  70,  30, 100,  10,  90 },  // abs
+        { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 },  // maxCap
+        { 400, 210,  60, 150, 100, 50,  100,  50,  50,  90,  10,   80 },  // used
+        {  10,   0,   0,   0,  0,   0,   0,   0,   0,   0,   0,  10 },  // pending
+        {  50,  30,  20,   10, 5,   5,   0,   0,   0,  10,  10,   0 },  // reserved
+        //          appA     appB appC   appD appE      appF appG
+        {   7,   3,   1,   2,   1,   1,  2,   1,   1,   2,   1,   1 },  // apps
+        {  -1,  -1,   1,   -1,  1,   1,  -1,   1,   1,  -1,   1,   1 },  // req granularity
+        {   3,   2,   0,   2,   0,   0,   2,   0,   0,   2,   0,   0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemorySize());
+    //2nd level child(E) preempts 10, but parent A has only 9 extra
+    //check the parent can prempt only the extra from > 2 level child
+    TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get("");
+    assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
+    long extraForQueueA = tempQueueAPartition.getUsed().getMemorySize()
+        - tempQueueAPartition.getGuaranteed().getMemorySize();
+    assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemorySize());
+  }
+
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -1064,6 +1120,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     ParentQueue root = mockParentQueue(null, queues[0], pqs);
     ResourceUsage resUsage = new ResourceUsage();
     resUsage.setUsed(used[0]);
+    resUsage.setReserved(reserved[0]);
     when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
     when(root.getAbsoluteUsedCapacity()).thenReturn(
         Resources.divide(rc, tot, used[0], tot));
@@ -1089,6 +1146,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         q = mockParentQueue(p, queues[i], pqs);
         ResourceUsage resUsagePerQueue = new ResourceUsage();
         resUsagePerQueue.setUsed(used[i]);
+        resUsagePerQueue.setReserved(reserved[i]);
         when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
       } else {
         q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
@@ -1165,6 +1223,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     ResourceUsage ru = new ResourceUsage();
     ru.setPending(pending[i]);
     ru.setUsed(used[i]);
+    ru.setReserved(reserved[i]);
     when(lq.getQueueResourceUsage()).thenReturn(ru);
     // consider moving where CapacityScheduler::comparator accessible
     final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 837c95f..e31a889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -18,231 +18,23 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
 
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
-  private static final Log LOG =
-      LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
-  static final String ROOT = CapacitySchedulerConfiguration.ROOT;
 
-  private Map<String, CSQueue> nameToCSQueues = null;
-  private Map<String, Resource> partitionToResource = null;
-  private Map<NodeId, SchedulerNode> nodeIdToSchedulerNodes = null;
-  private RMNodeLabelsManager nlm = null;
-  private RMContext rmContext = null;
-
-  private ResourceCalculator rc = new DefaultResourceCalculator();
-  private Clock mClock = null;
-  private CapacitySchedulerConfiguration conf = null;
-  private CapacityScheduler cs = null;
-  private EventHandler<SchedulerEvent> mDisp = null;
-  private ProportionalCapacityPreemptionPolicy policy = null;
-  private Resource clusterResource = null;
-
-  @SuppressWarnings("unchecked")
+public class TestProportionalCapacityPreemptionPolicyForNodePartitions
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
   @Before
   public void setup() {
-    org.apache.log4j.Logger.getRootLogger().setLevel(
-        org.apache.log4j.Level.DEBUG);
-
-    conf = new CapacitySchedulerConfiguration(new Configuration(false));
-    conf.setLong(
-        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
-    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
-        3000);
-    // report "ideal" preempt
-    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
-        (float) 1.0);
-    conf.setFloat(
-        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
-        (float) 1.0);
-
-    mClock = mock(Clock.class);
-    cs = mock(CapacityScheduler.class);
-    when(cs.getResourceCalculator()).thenReturn(rc);
-    when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
-    when(cs.getConfiguration()).thenReturn(conf);
-
-    nlm = mock(RMNodeLabelsManager.class);
-    mDisp = mock(EventHandler.class);
-
-    rmContext = mock(RMContext.class);
-    when(rmContext.getNodeLabelManager()).thenReturn(nlm);
-    Dispatcher disp = mock(Dispatcher.class);
-    when(rmContext.getDispatcher()).thenReturn(disp);
-    when(disp.getEventHandler()).thenReturn(mDisp);
-    when(cs.getRMContext()).thenReturn(rmContext);
-
+    super.setup();
     policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
-    partitionToResource = new HashMap<>();
-    nodeIdToSchedulerNodes = new HashMap<>();
-    nameToCSQueues = new HashMap<>();
-  }
-
-  @Test
-  public void testBuilder() throws Exception {
-    /**
-     * Test of test, make sure we build expected mock schedulable objects
-     */
-    String labelsConfig =
-        "=200,true;" + // default partition
-        "red=100,false;" + // partition=red
-        "blue=200,true"; // partition=blue
-    String nodesConfig =
-        "n1=red;" + // n1 has partition=red
-        "n2=blue;" + // n2 has partition=blue
-        "n3="; // n3 doesn't have partition
-    String queuesConfig =
-        // guaranteed,max,used,pending
-        "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
-        "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
-        "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
-        "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
-        "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
-    String appsConfig=
-        //queueName\t(priority,resource,host,expression,#repeat,reserved)
-        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
-        "a1\t" // app1 in a1
-        + "(1,1,n3,red,50,false);" + // 50 * default in n3
-
-        "a1\t" // app2 in a1
-        + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
-                                                // 50 * ignore-exclusivity (allocated)
-        + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
-                                                          // 50 in n2 (allocated)
-        "a2\t" // app3 in a2
-        + "(1,1,n3,red,50,false);" + // 50 * default in n3
-
-        "b\t" // app4 in b
-        + "(1,1,n1,red,100,false);";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-
-    // Check queues:
-    // root
-    checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
-    checkPendingResource(cs.getQueue("root"), "", 100);
-    checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
-    checkPendingResource(cs.getQueue("root"), "red", 100);
-    checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
-    checkPendingResource(cs.getQueue("root"), "blue", 200);
-
-    // a
-    checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
-    checkPendingResource(cs.getQueue("a"), "", 100);
-    checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
-    checkPendingResource(cs.getQueue("a"), "red", 0);
-    checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
-    checkPendingResource(cs.getQueue("a"), "blue", 200);
-
-    // a1
-    checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
-    checkPendingResource(cs.getQueue("a1"), "", 100);
-    checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
-    checkPendingResource(cs.getQueue("a1"), "red", 0);
-    checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
-    checkPendingResource(cs.getQueue("a1"), "blue", 0);
-
-    // a2
-    checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
-    checkPendingResource(cs.getQueue("a2"), "", 0);
-    checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
-    checkPendingResource(cs.getQueue("a2"), "red", 0);
-    checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
-    checkPendingResource(cs.getQueue("a2"), "blue", 200);
-
-    // b1
-    checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
-    checkPendingResource(cs.getQueue("b"), "", 0);
-    checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
-    checkPendingResource(cs.getQueue("b"), "red", 100);
-    checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
-    checkPendingResource(cs.getQueue("b"), "blue", 0);
-
-    // Check ignored partitioned containers in queue
-    Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
-        .getIgnoreExclusivityRMContainers().get("blue").size());
-
-    // Check applications
-    Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
-    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
-    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
-
-    // Check #containers
-    FiCaSchedulerApp app1 = getApp("a1", 1);
-    FiCaSchedulerApp app2 = getApp("a1", 2);
-    FiCaSchedulerApp app3 = getApp("a2", 3);
-    FiCaSchedulerApp app4 = getApp("b", 4);
-
-    Assert.assertEquals(50, app1.getLiveContainers().size());
-    checkContainerNodesInApp(app1, 50, "n3");
-
-    Assert.assertEquals(50, app2.getLiveContainers().size());
-    Assert.assertEquals(150, app2.getReservedContainers().size());
-    checkContainerNodesInApp(app2, 200, "n2");
-
-    Assert.assertEquals(50, app3.getLiveContainers().size());
-    checkContainerNodesInApp(app3, 50, "n3");
-
-    Assert.assertEquals(100, app4.getLiveContainers().size());
-    checkContainerNodesInApp(app4, 100, "n1");
   }
 
   @Test
@@ -822,477 +614,4 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     verify(mDisp, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
   }
-
-  private ApplicationAttemptId getAppAttemptId(int id) {
-    ApplicationId appId = ApplicationId.newInstance(0L, id);
-    ApplicationAttemptId appAttemptId =
-        ApplicationAttemptId.newInstance(appId, 1);
-    return appAttemptId;
-  }
-
-  private void checkContainerNodesInApp(FiCaSchedulerApp app,
-      int expectedContainersNumber, String host) {
-    NodeId nodeId = NodeId.newInstance(host, 1);
-    int num = 0;
-    for (RMContainer c : app.getLiveContainers()) {
-      if (c.getAllocatedNode().equals(nodeId)) {
-        num++;
-      }
-    }
-    for (RMContainer c : app.getReservedContainers()) {
-      if (c.getAllocatedNode().equals(nodeId)) {
-        num++;
-      }
-    }
-    Assert.assertEquals(expectedContainersNumber, num);
-  }
-
-  private FiCaSchedulerApp getApp(String queueName, int appId) {
-    for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
-        .getApplications()) {
-      if (app.getApplicationId().getId() == appId) {
-        return app;
-      }
-    }
-    return null;
-  }
-
-  private void checkAbsCapacities(CSQueue queue, String partition,
-      float guaranteed, float max, float used) {
-    QueueCapacities qc = queue.getQueueCapacities();
-    Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
-    Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
-    Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
-  }
-
-  private void checkPendingResource(CSQueue queue, String partition, int pending) {
-    ResourceUsage ru = queue.getQueueResourceUsage();
-    Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
-  }
-
-  private void buildEnv(String labelsConfig, String nodesConfig,
-      String queuesConfig, String appsConfig) throws IOException {
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
-  }
-
-  private void buildEnv(String labelsConfig, String nodesConfig,
-      String queuesConfig, String appsConfig,
-      boolean useDominantResourceCalculator) throws IOException {
-    if (useDominantResourceCalculator) {
-      when(cs.getResourceCalculator()).thenReturn(
-          new DominantResourceCalculator());
-    }
-    mockNodeLabelsManager(labelsConfig);
-    mockSchedulerNodes(nodesConfig);
-    for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
-      when(cs.getSchedulerNode(nodeId)).thenReturn(
-          nodeIdToSchedulerNodes.get(nodeId));
-    }
-    ParentQueue root = mockQueueHierarchy(queuesConfig);
-    when(cs.getRootQueue()).thenReturn(root);
-    when(cs.getClusterResource()).thenReturn(clusterResource);
-    mockApplications(appsConfig);
-
-    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
-        mClock);
-  }
-
-  private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
-      String queueName, List<RMContainer> reservedContainers,
-      List<RMContainer> liveContainers) {
-    int containerId = 1;
-    int start = containersConfig.indexOf("=") + 1;
-    int end = -1;
-
-    while (start < containersConfig.length()) {
-      while (start < containersConfig.length()
-          && containersConfig.charAt(start) != '(') {
-        start++;
-      }
-      if (start >= containersConfig.length()) {
-        throw new IllegalArgumentException(
-            "Error containers specification, line=" + containersConfig);
-      }
-      end = start + 1;
-      while (end < containersConfig.length()
-          && containersConfig.charAt(end) != ')') {
-        end++;
-      }
-      if (end >= containersConfig.length()) {
-        throw new IllegalArgumentException(
-            "Error containers specification, line=" + containersConfig);
-      }
-
-      // now we found start/end, get container values
-      String[] values = containersConfig.substring(start + 1, end).split(",");
-      if (values.length != 6) {
-        throw new IllegalArgumentException("Format to define container is:"
-            + "(priority,resource,host,expression,repeat,reserved)");
-      }
-      Priority pri = Priority.newInstance(Integer.parseInt(values[0]));
-      Resource res = parseResourceFromString(values[1]);
-      NodeId host = NodeId.newInstance(values[2], 1);
-      String exp = values[3];
-      int repeat = Integer.parseInt(values[4]);
-      boolean reserved = Boolean.parseBoolean(values[5]);
-
-      for (int i = 0; i < repeat; i++) {
-        Container c = mock(Container.class);
-        when(c.getResource()).thenReturn(res);
-        when(c.getPriority()).thenReturn(pri);
-        RMContainerImpl rmc = mock(RMContainerImpl.class);
-        when(rmc.getAllocatedNode()).thenReturn(host);
-        when(rmc.getNodeLabelExpression()).thenReturn(exp);
-        when(rmc.getAllocatedResource()).thenReturn(res);
-        when(rmc.getContainer()).thenReturn(c);
-        when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
-        final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
-        when(rmc.getContainerId()).thenReturn(
-            cId);
-        doAnswer(new Answer<Integer>() {
-          @Override
-          public Integer answer(InvocationOnMock invocation) throws Throwable {
-            return cId.compareTo(((RMContainer) invocation.getArguments()[0])
-                .getContainerId());
-          }
-        }).when(rmc).compareTo(any(RMContainer.class));
-
-        if (containerId == 1) {
-          when(rmc.isAMContainer()).thenReturn(true);
-        }
-
-        if (reserved) {
-          reservedContainers.add(rmc);
-        } else {
-          liveContainers.add(rmc);
-        }
-
-        // If this is a non-exclusive allocation
-        String partition = null;
-        if (exp.isEmpty()
-            && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
-                .isEmpty()) {
-          LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
-          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
-              queue.getIgnoreExclusivityRMContainers();
-          if (!ignoreExclusivityContainers.containsKey(partition)) {
-            ignoreExclusivityContainers.put(partition,
-                new TreeSet<RMContainer>());
-          }
-          ignoreExclusivityContainers.get(partition).add(rmc);
-        }
-        LOG.debug("add container to app=" + attemptId + " res=" + res
-            + " node=" + host + " nodeLabelExpression=" + exp + " partition="
-            + partition);
-
-        containerId++;
-      }
-
-      start = end + 1;
-    }
-  }
-
-  /**
-   * Format is:
-   * <pre>
-   * queueName\t  // app1
-   * (priority,resource,host,expression,#repeat,reserved)
-   * (priority,resource,host,expression,#repeat,reserved);
-   * queueName\t  // app2
-   * </pre>
-   */
-  private void mockApplications(String appsConfig) {
-    int id = 1;
-    for (String a : appsConfig.split(";")) {
-      String[] strs = a.split("\t");
-      String queueName = strs[0];
-
-      // get containers
-      List<RMContainer> liveContainers = new ArrayList<RMContainer>();
-      List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-      ApplicationId appId = ApplicationId.newInstance(0L, id);
-      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-
-      mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
-          liveContainers);
-
-      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
-      when(app.getLiveContainers()).thenReturn(liveContainers);
-      when(app.getReservedContainers()).thenReturn(reservedContainers);
-      when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
-      when(app.getApplicationId()).thenReturn(appId);
-      when(app.getPriority()).thenReturn(Priority.newInstance(0));
-
-      // add to LeafQueue
-      LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
-      queue.getApplications().add(app);
-
-      id++;
-    }
-  }
-
-  /**
-   * Format is:
-   * host1=partition;
-   * host2=partition;
-   */
-  private void mockSchedulerNodes(String schedulerNodesConfigStr)
-      throws IOException {
-    String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
-    for (String p : nodesConfigStrArray) {
-      NodeId nodeId = NodeId.newInstance(p.substring(0, p.indexOf("=")), 1);
-      String partition = p.substring(p.indexOf("=") + 1, p.length());
-
-      SchedulerNode sn = mock(SchedulerNode.class);
-      when(sn.getNodeID()).thenReturn(nodeId);
-      when(sn.getPartition()).thenReturn(partition);
-      nodeIdToSchedulerNodes.put(nodeId, sn);
-
-      LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
-    }
-  }
-
-  /**
-   * Format is:
-   * <pre>
-   * partition0=total_resource,exclusivity;
-   * partition1=total_resource,exclusivity;
-   * ...
-   * </pre>
-   */
-  private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
-    String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
-    clusterResource = Resources.createResource(0);
-    for (String p : partitionConfigArr) {
-      String partitionName = p.substring(0, p.indexOf("="));
-      Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
-          p.indexOf(",")));
-     boolean exclusivity =
-          Boolean.parseBoolean(p.substring(p.indexOf(",") + 1, p.length()));
-      when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
-          .thenReturn(res);
-      when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
-
-      // add to partition to resource
-      partitionToResource.put(partitionName, res);
-      LOG.debug("add partition=" + partitionName + " totalRes=" + res
-          + " exclusivity=" + exclusivity);
-      Resources.addTo(clusterResource, res);
-    }
-
-    when(nlm.getClusterNodeLabelNames()).thenReturn(
-        partitionToResource.keySet());
-  }
-
-  private Resource parseResourceFromString(String p) {
-    String[] resource = p.split(":");
-    Resource res = Resources.createResource(0);
-    if (resource.length == 1) {
-      res = Resources.createResource(Integer.parseInt(resource[0]));
-    } else {
-      res = Resources.createResource(Integer.parseInt(resource[0]),
-          Integer.parseInt(resource[1]));
-    }
-    return res;
-  }
-
-  /**
-   * Format is:
-   * <pre>
-   * root (<partition-name-1>=[guaranteed max used pending],<partition-name-2>=..);
-   * -A(...);
-   * --A1(...);
-   * --A2(...);
-   * -B...
-   * </pre>
-   * ";" splits queues, and there should no empty lines, no extra spaces
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  private ParentQueue mockQueueHierarchy(String queueExprs) {
-    String[] queueExprArray = queueExprs.split(";");
-    ParentQueue rootQueue = null;
-    for (int idx = 0; idx < queueExprArray.length; idx++) {
-      String q = queueExprArray[idx];
-      CSQueue queue;
-
-      // Initialize queue
-      if (isParent(queueExprArray, idx)) {
-        ParentQueue parentQueue = mock(ParentQueue.class);
-        queue = parentQueue;
-        List<CSQueue> children = new ArrayList<CSQueue>();
-        when(parentQueue.getChildQueues()).thenReturn(children);
-      } else {
-        LeafQueue leafQueue = mock(LeafQueue.class);
-        final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
-            new Comparator<FiCaSchedulerApp>() {
-              @Override
-              public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
-                return a1.getApplicationId().compareTo(a2.getApplicationId());
-              }
-            });
-        when(leafQueue.getApplications()).thenReturn(apps);
-        OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
-        when(so.getPreemptionIterator()).thenAnswer(new Answer() {
-          public Object answer(InvocationOnMock invocation) {
-            return apps.descendingIterator();
-          }
-        });
-        when(leafQueue.getOrderingPolicy()).thenReturn(so);
-
-        Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
-            new HashMap<>();
-        when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
-            ignorePartitionContainers);
-        queue = leafQueue;
-      }
-
-      setupQueue(queue, q, queueExprArray, idx);
-      if (queue.getQueueName().equals(ROOT)) {
-        rootQueue = (ParentQueue) queue;
-      }
-    }
-    return rootQueue;
-  }
-
-  private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
-      int idx) {
-    LOG.debug("*** Setup queue, source=" + q);
-    String queuePath = null;
-
-    int myLevel = getLevel(q);
-    if (0 == myLevel) {
-      // It's root
-      when(queue.getQueueName()).thenReturn(ROOT);
-      queuePath = ROOT;
-    }
-
-    String queueName = getQueueName(q);
-    when(queue.getQueueName()).thenReturn(queueName);
-
-    // Setup parent queue, and add myself to parentQueue.children-list
-    ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
-    if (null != parentQueue) {
-      when(queue.getParent()).thenReturn(parentQueue);
-      parentQueue.getChildQueues().add(queue);
-
-      // Setup my path
-      queuePath = parentQueue.getQueuePath() + "." + queueName;
-    }
-    when(queue.getQueuePath()).thenReturn(queuePath);
-
-    QueueCapacities qc = new QueueCapacities(0 == myLevel);
-    ResourceUsage ru = new ResourceUsage();
-
-    when(queue.getQueueCapacities()).thenReturn(qc);
-    when(queue.getQueueResourceUsage()).thenReturn(ru);
-
-    LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
-        + queue.getQueuePath());
-    LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
-        .getQueueName()));
-
-    // Setup other fields like used resource, guaranteed resource, etc.
-    String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
-    for (String s : capacitySettingStr.split(",")) {
-      String partitionName = s.substring(0, s.indexOf("="));
-      String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
-      // Add a small epsilon to capacities to avoid truncate when doing
-      // Resources.multiply
-      float epsilon = 1e-6f;
-      Resource totResoucePerPartition = partitionToResource.get(partitionName);
-      float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[0].trim()), totResoucePerPartition)
-          + epsilon;
-      float absMax = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[1].trim()), totResoucePerPartition)
-          + epsilon;
-      float absUsed = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[2].trim()), totResoucePerPartition)
-          + epsilon;
-      Resource pending = parseResourceFromString(values[3].trim());
-      qc.setAbsoluteCapacity(partitionName, absGuaranteed);
-      qc.setAbsoluteMaximumCapacity(partitionName, absMax);
-      qc.setAbsoluteUsedCapacity(partitionName, absUsed);
-      ru.setPending(partitionName, pending);
-      if (!isParent(queueExprArray, idx)) {
-        LeafQueue lq = (LeafQueue) queue;
-        when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
-            isA(String.class))).thenReturn(pending);
-      }
-      ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
-      LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
-          + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
-          + ",abs_used" + absUsed + ",pending_resource=" + pending + "]");
-    }
-
-    // Setup preemption disabled
-    when(queue.getPreemptionDisabled()).thenReturn(
-        conf.getPreemptionDisabled(queuePath, false));
-
-    nameToCSQueues.put(queueName, queue);
-    when(cs.getQueue(eq(queueName))).thenReturn(queue);
-  }
-
-  /**
-   * Level of a queue is how many "-" at beginning, root's level is 0
-   */
-  private int getLevel(String q) {
-    int level = 0; // level = how many "-" at beginning
-    while (level < q.length() && q.charAt(level) == '-') {
-      level++;
-    }
-    return level;
-  }
-
-  private String getQueueName(String q) {
-    int idx = 0;
-    // find first != '-' char
-    while (idx < q.length() && q.charAt(idx) == '-') {
-      idx++;
-    }
-    if (idx == q.length()) {
-      throw new IllegalArgumentException("illegal input:" + q);
-    }
-    // name = after '-' and before '('
-    String name = q.substring(idx, q.indexOf('('));
-    if (name.isEmpty()) {
-      throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
-    }
-    if (name.contains(".")) {
-      throw new IllegalArgumentException("queue name shouldn't contain '.':"
-          + name);
-    }
-    return name;
-  }
-
-  private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
-    idx--;
-    while (idx >= 0) {
-      int level = getLevel(queueExprArray[idx]);
-      if (level < myLevel) {
-        String parentQueuName = getQueueName(queueExprArray[idx]);
-        return (ParentQueue) nameToCSQueues.get(parentQueuName);
-      }
-      idx--;
-    }
-
-    return null;
-  }
-
-  /**
-   * Get if a queue is ParentQueue
-   */
-  private boolean isParent(String[] queues, int idx) {
-    int myLevel = getLevel(queues[idx]);
-    idx++;
-    while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
-      idx++;
-    }
-    if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
-      // It's a LeafQueue
-      return false;
-    } else {
-      return true;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
new file mode 100644
index 0000000..38b2e78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestProportionalCapacityPreemptionPolicyForReservedContainers
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+  @Before
+  public void setup() {
+    super.setup();
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+        true);
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  @Test
+  public void testPreemptionForSimpleReservedContainer() throws IOException {
+    /**
+     * The simplest test of reserved container, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100
+     * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+     * container is 1.
+     * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+     * so B needs to preempt 9 containers from A at n1 instead of randomly
+     * preempt from n1 and n2.
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+        "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 9 9]);" + //root
+            "-a(=[50 100 90 0]);" + // a
+            "-b(=[50 100 10 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,45,false)" // 45 in n1
+            + "(1,1,n2,,45,false);" + // 45 in n2
+        "b\t" // app2 in b
+            + "(1,1,n1,,1,false)" // AM container in n1
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Total 5 preempted from app1 at n1, don't preempt container from other
+    // app/node
+    verify(mDisp, times(5)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(5)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testUseReservedAndFifoSelectorTogether() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 30:70
+     * Total cluster resource = 100
+     * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
+     * container is 1.
+     * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
+     *   B also has 20 pending resources.
+     * so B needs to preempt:
+     * - 10 containers from n1 (for reserved)
+     * - 5 containers from n2 for pending resources
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+        "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 70 10]);" + //root
+            "-a(=[30 100 45 0]);" + // a
+            "-b(=[70 100 55 70 50])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n2,,35,false)" // 35 in n2
+            + "(1,1,n1,,10,false);" + // 10 in n1
+            "b\t" // app2 in b
+            + "(1,1,n2,,5,false)" // 5 in n2
+            + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(10)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(5)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testReservedSelectorSkipsAMContainer() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 30:70
+     * Total cluster resource = 100
+     * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
+     * container is 1.
+     * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
+     *   B also has 20 pending resources.
+     *
+     * Ideally B needs to preempt:
+     * - 10 containers from n1 (for reserved)
+     * - 5 containers from n2 for pending resources
+     *
+     * However, since one AM container is located at n1 (from queueA), we cannot
+     * preempt 10 containers from n1 for reserved container. Instead, we will
+     * preempt 15 containers from n2, since containers from queueA launched in n2
+     * are later than containers from queueA launched in n1 (FIFO order of containers)
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 70 10]);" + //root
+            "-a(=[30 100 45 0]);" + // a
+            "-b(=[70 100 55 70 50])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,10,false)" // 10 in n1
+            + "(1,1,n2,,35,false);" +// 35 in n2
+            "b\t" // app2 in b
+            + "(1,1,n2,,5,false)" // 5 in n2
+            + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(15)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerRespectGuaranteedResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 85:15
+     * Total cluster resource = 100
+     * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+     * container is 1.
+     * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+     *
+     * If we preempt 9 containers from queue-A, queue-A will be below its
+     * guaranteed resource = 90 - 9 = 81 < 85.
+     *
+     * So no preemption will take place
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 9 9]);" + //root
+            "-a(=[85 100 90 0]);" + // a
+            "-b(=[15 100 10 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,45,false)" // 45 in n1
+            + "(1,1,n2,,45,false);" + // 45 in n2
+            "b\t" // app2 in b
+            + "(1,1,n1,,1,false)" // AM container in n1
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerWhichHasAvailableResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100
+     * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+     * container is 1.
+     * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+     *
+     * So we can get 4 containers preempted after preemption.
+     * (reserved 5 + preempted 4) = 9
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 99 9 9]);" + //root
+            "-a(=[50 100 90 0]);" + // a
+            "-b(=[50 100 9 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,45,false)" // 45 in n1
+            + "(1,1,n2,,45,false);" + // 45 in n2
+            "b\t" // app2 in b
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Total 4 preempted from app1 at n1, don't preempt container from other
+    // app/node
+    verify(mDisp, times(4)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100
+     * - A has 45 containers on two node, size of each container is 2,
+     *   n1 has 23, n2 has 22
+     * - B reserves 1 container with size = 9 at n1,
+     *
+     * So we can get 4 containers (total-resource = 8) preempted after
+     * preemption. Actual required is 3.5, but we need to preempt integer
+     * number of containers
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 99 9 9]);" + //root
+            "-a(=[50 100 90 0]);" + // a
+            "-b(=[50 100 9 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,2,n1,,24,false)" // 48 in n1
+            + "(1,2,n2,,23,false);" + // 46 in n2
+            "b\t" // app2 in b
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Total 4 preempted from app1 at n1, don't preempt container from other
+    // app/node
+    verify(mDisp, times(4)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerRespectAvailableResources()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100, 4 nodes, 25 on each node
+     * - A has 10 containers on every node, size of container is 2
+     * - B reserves 1 container with size = 9 at n1,
+     *
+     * So even if we cannot allocate container for B now, no preemption should
+     * happen since there're plenty of available resources.
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig =
+        "n1= res=25;" +
+            "n2= res=25;" +
+            "n3= res=25;" +
+            "n4= res=25;";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 89 9 9]);" + //root
+            "-a(=[50 100 80 0]);" + // a
+            "-b(=[50 100 9 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,2,n1,,10,false)" // 10 in n1
+            + "(1,2,n2,,10,false)" // 10 in n2
+            + "(1,2,n3,,10,false)" // 10 in n3
+            + "(1,2,n4,,10,false);" + // 10 in n4
+            "b\t" // app2 in b
+            + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // No preemption should happen
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n3", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n4", 1))));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/4] hadoop git commit: YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan

Posted by ep...@apache.org.
YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan

(cherry picked from commit bb62e0592566b2fcae7136b30972aad2d3ac55b0)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/40367c8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/40367c8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/40367c8d

Branch: refs/heads/branch-2.8
Commit: 40367c8da3a4798b004714c3fbd69d84c1dc84f5
Parents: bced9e2
Author: Jian He <ji...@apache.org>
Authored: Thu May 5 12:56:21 2016 -0700
Committer: Eric Payne <ep...@apache.org>
Committed: Fri Dec 9 20:33:54 2016 +0000

----------------------------------------------------------------------
 .../monitor/SchedulingMonitor.java              |  16 +-
 .../CapacitySchedulerPreemptionUtils.java       |  35 +-
 .../capacity/FifoCandidatesSelector.java        |  26 +-
 .../capacity/PreemptableResourceCalculator.java | 139 ++--
 .../ProportionalCapacityPreemptionPolicy.java   |  96 ++-
 .../ReservedContainerCandidatesSelector.java    | 316 +++++++++
 .../monitor/capacity/TempQueuePerPartition.java | 184 ++++-
 .../rmcontainer/RMContainer.java                |   2 +
 .../rmcontainer/RMContainerImpl.java            |  10 +
 .../scheduler/AbstractYarnScheduler.java        |   2 +-
 .../scheduler/SchedulerApplicationAttempt.java  |   4 +
 .../scheduler/SchedulerNode.java                |  21 +-
 .../scheduler/capacity/CapacityScheduler.java   |   9 +-
 .../CapacitySchedulerConfiguration.java         |  24 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |   1 +
 .../common/fica/FiCaSchedulerNode.java          |   7 +-
 .../scheduler/fair/FSAppAttempt.java            |   1 +
 .../scheduler/fair/FairScheduler.java           |   3 +-
 .../scheduler/fifo/FifoScheduler.java           |   2 +-
 ...alCapacityPreemptionPolicyMockFramework.java | 687 +++++++++++++++++++
 ...estProportionalCapacityPreemptionPolicy.java |  61 +-
 ...pacityPreemptionPolicyForNodePartitions.java | 687 +------------------
 ...tyPreemptionPolicyForReservedContainers.java | 430 ++++++++++++
 ...alCapacityPreemptionPolicyMockFramework.java | 247 +++++++
 .../TestSchedulerApplicationAttempt.java        |   3 +-
 .../CapacitySchedulerPreemptionTestBase.java    | 149 ++++
 .../capacity/TestCapacityScheduler.java         |   2 +-
 .../TestCapacitySchedulerLazyPreemption.java    | 639 +++++++++++++++++
 .../TestCapacitySchedulerPreemption.java        | 683 ------------------
 ...TestCapacitySchedulerSurgicalPreemption.java | 246 +++++++
 .../scheduler/fair/TestFairScheduler.java       |   2 +-
 .../scheduler/fifo/TestFifoScheduler.java       |   2 +-
 32 files changed, 3193 insertions(+), 1543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 55ec858..03e180d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 
@@ -84,10 +85,17 @@ public class SchedulingMonitor extends AbstractService {
     @Override
     public void run() {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
-        //invoke the preemption policy at a regular pace
-        //the policy will generate preemption or kill events
-        //managed by the dispatcher
-        invokePolicy();
+        try {
+          //invoke the preemption policy at a regular pace
+          //the policy will generate preemption or kill events
+          //managed by the dispatcher
+          invokePolicy();
+        } catch (YarnRuntimeException e) {
+          LOG.error("YarnRuntimeException raised while executing preemption"
+              + " checker, skip this run..., exception=", e);
+        }
+
+        // Wait before next run
         try {
           Thread.sleep(monitorInterval);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index a71f108..42d8730 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -40,9 +42,9 @@ public class CapacitySchedulerPreemptionUtils {
 
       //  Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
       if (Resources.greaterThan(context.getResourceCalculator(),
-          clusterResource, qT.actuallyToBePreempted, Resources.none())) {
+          clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
         resToObtainByPartition.put(qT.partition,
-            Resources.clone(qT.actuallyToBePreempted));
+            Resources.clone(qT.getActuallyToBePreempted()));
       }
     }
 
@@ -62,4 +64,33 @@ public class CapacitySchedulerPreemptionUtils {
     }
     return containers.contains(container);
   }
+
+  public static void deductPreemptableResourcesBasedSelectedCandidates(
+      CapacitySchedulerPreemptionContext context,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+    for (Set<RMContainer> containers : selectedCandidates.values()) {
+      for (RMContainer c : containers) {
+        SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode(
+            c.getAllocatedNode());
+        if (null == schedulerNode) {
+          continue;
+        }
+
+        String partition = schedulerNode.getPartition();
+        String queue = c.getQueueName();
+        TempQueuePerPartition tq = context.getQueueByPartition(queue,
+            partition);
+
+        Resource res = c.getReservedResource();
+        if (null == res) {
+          res = c.getAllocatedResource();
+        }
+
+        if (null != res) {
+          tq.deductActuallyToBePreempted(context.getResourceCalculator(),
+              tq.totalPartitionResource, res);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 499d0ff..a8c62fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -55,7 +54,7 @@ public class FifoCandidatesSelector
     super(preemptionContext);
 
     preemptableAmountCalculator = new PreemptableResourceCalculator(
-        preemptionContext);
+        preemptionContext, false);
   }
 
   @Override
@@ -66,8 +65,13 @@ public class FifoCandidatesSelector
     preemptableAmountCalculator.computeIdealAllocation(clusterResource,
         totalPreemptionAllowed);
 
-    Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
-        new HashMap<>();
+    // Previous selectors (with higher priority) could have already
+    // selected containers. We need to deduct preemptable resources
+    // based on already selected candidates.
+    CapacitySchedulerPreemptionUtils
+        .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
+            selectedCandidates);
+
     List<RMContainer> skippedAMContainerlist = new ArrayList<>();
 
     // Loop all leaf queues
@@ -109,7 +113,7 @@ public class FifoCandidatesSelector
                 continue;
               }
               boolean preempted = tryPreemptContainerAndDeductResToObtain(
-                  resToObtainByPartition, c, clusterResource, preemptMap,
+                  resToObtainByPartition, c, clusterResource, selectedCandidates,
                   totalPreemptionAllowed);
               if (!preempted) {
                 continue;
@@ -132,7 +136,7 @@ public class FifoCandidatesSelector
           }
 
           preemptFrom(fc, clusterResource, resToObtainByPartition,
-              skippedAMContainerlist, skippedAMSize, preemptMap,
+              skippedAMContainerlist, skippedAMSize, selectedCandidates,
               totalPreemptionAllowed);
         }
 
@@ -144,13 +148,13 @@ public class FifoCandidatesSelector
                 leafQueue.getAbsoluteCapacity()),
             leafQueue.getMaxAMResourcePerQueuePercent());
 
-        preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
+        preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
             resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
             totalPreemptionAllowed);
       }
     }
 
-    return preemptMap;
+    return selectedCandidates;
   }
 
   /**
@@ -236,9 +240,9 @@ public class FifoCandidatesSelector
         resourceToObtainByPartitions.remove(nodePartition);
       }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Marked container=" + rmContainer.getContainerId()
-            + " in partition=" + nodePartition
-            + " to be preemption candidates");
+        LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
+            .getContainerId() + " from partition=" + nodePartition + " queue="
+            + rmContainer.getQueueName() + " to be preemption candidates");
       }
       // Add to preemptMap
       addToPreemptMap(preemptMap, attemptId, rmContainer);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 2217210..103b419 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -37,7 +37,7 @@ import java.util.Set;
 
 /**
  * Calculate how much resources need to be preempted for each queue,
- * will be used by {@link FifoCandidatesSelector}
+ * will be used by {@link PreemptionCandidatesSelector}
  */
 public class PreemptableResourceCalculator {
   private static final Log LOG =
@@ -45,6 +45,7 @@ public class PreemptableResourceCalculator {
 
   private final CapacitySchedulerPreemptionContext context;
   private final ResourceCalculator rc;
+  private boolean isReservedPreemptionCandidatesSelector;
 
   static class TQComparator implements Comparator<TempQueuePerPartition> {
     private ResourceCalculator rc;
@@ -71,18 +72,31 @@ public class PreemptableResourceCalculator {
     // capacity and therefore considered last for resources.
     private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
       double pctOver = Integer.MAX_VALUE;
-      if (q != null && Resources.greaterThan(
-          rc, clusterRes, q.guaranteed, Resources.none())) {
-        pctOver =
-            Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
+      if (q != null && Resources.greaterThan(rc, clusterRes,
+          q.getGuaranteed(),
+          Resources.none())) {
+        pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
+            q.getGuaranteed());
       }
       return (pctOver);
     }
   }
 
-  public PreemptableResourceCalculator(CapacitySchedulerPreemptionContext preemptionContext) {
+  /**
+   * PreemptableResourceCalculator constructor
+   *
+   * @param preemptionContext preemption policy context
+   * @param isReservedPreemptionCandidatesSelector this will be set by
+   * different implementation of candidate selectors, please refer to
+   * TempQueuePerPartition#offer for details.
+   */
+  public PreemptableResourceCalculator(
+      CapacitySchedulerPreemptionContext preemptionContext,
+      boolean isReservedPreemptionCandidatesSelector) {
     context = preemptionContext;
     rc = preemptionContext.getResourceCalculator();
+    this.isReservedPreemptionCandidatesSelector =
+        isReservedPreemptionCandidatesSelector;
   }
 
   /**
@@ -101,11 +115,11 @@ public class PreemptableResourceCalculator {
       }
     } else {
       for (TempQueuePerPartition q : queues) {
-        Resources.addTo(activeCap, q.guaranteed);
+        Resources.addTo(activeCap, q.getGuaranteed());
       }
       for (TempQueuePerPartition q : queues) {
         q.normalizedGuarantee = Resources.divide(rc, clusterResource,
-            q.guaranteed, activeCap);
+            q.getGuaranteed(), activeCap);
       }
     }
   }
@@ -114,7 +128,8 @@ public class PreemptableResourceCalculator {
   // return the list of all queues that have the same idealAssigned
   // percentage of guaranteed.
   protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
-      PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
+      PriorityQueue<TempQueuePerPartition> orderedByNeed,
+      TQComparator tqComparator) {
     ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
     while (!orderedByNeed.isEmpty()) {
       TempQueuePerPartition q1 = orderedByNeed.remove();
@@ -155,15 +170,19 @@ public class PreemptableResourceCalculator {
         tqComparator);
     for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
       TempQueuePerPartition q = i.next();
-      if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
-        q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
+      Resource used = q.getUsed();
+
+      if (Resources.greaterThan(rc, tot_guarant, used,
+          q.getGuaranteed())) {
+        q.idealAssigned = Resources.add(
+            q.getGuaranteed(), q.untouchableExtra);
       } else {
-        q.idealAssigned = Resources.clone(q.current);
+        q.idealAssigned = Resources.clone(used);
       }
       Resources.subtractFrom(unassigned, q.idealAssigned);
-      // If idealAssigned < (current + pending), q needs more resources, so
+      // If idealAssigned < (allocated + used + pending), q needs more resources, so
       // add it to the list of underserved queues, ordered by need.
-      Resource curPlusPend = Resources.add(q.current, q.pending);
+      Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
       if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
         orderedByNeed.add(q);
       }
@@ -190,7 +209,8 @@ public class PreemptableResourceCalculator {
         TempQueuePerPartition sub = i.next();
         Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
             unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
-        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+        Resource wQidle = sub.offer(wQavail, rc, tot_guarant,
+            isReservedPreemptionCandidatesSelector);
         Resource wQdone = Resources.subtract(wQavail, wQidle);
 
         if (Resources.greaterThan(rc, tot_guarant,
@@ -234,8 +254,8 @@ public class PreemptableResourceCalculator {
     Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
 
     for (TempQueuePerPartition q : qAlloc) {
-      if (Resources
-          .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+      if (Resources.greaterThan(rc, tot_guarant,
+          q.getGuaranteed(), Resources.none())) {
         nonZeroGuarQueues.add(q);
       } else {
         zeroGuarQueues.add(q);
@@ -258,19 +278,22 @@ public class PreemptableResourceCalculator {
     // how much preemption is required overall
     Resource totPreemptionNeeded = Resource.newInstance(0, 0);
     for (TempQueuePerPartition t:queues) {
-      if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
-        Resources.addTo(totPreemptionNeeded,
-            Resources.subtract(t.current, t.idealAssigned));
+      if (Resources.greaterThan(rc, tot_guarant,
+          t.getUsed(), t.idealAssigned)) {
+        Resources.addTo(totPreemptionNeeded, Resources
+            .subtract(t.getUsed(), t.idealAssigned));
       }
     }
 
-    // if we need to preempt more than is allowed, compute a factor (0<f<1)
-    // that is used to scale down how much we ask back from each queue
+    /**
+     * if we need to preempt more than is allowed, compute a factor (0<f<1)
+     * that is used to scale down how much we ask back from each queue
+     */
     float scalingFactor = 1.0F;
-    if (Resources.greaterThan(rc, tot_guarant,
-        totPreemptionNeeded, totalPreemptionAllowed)) {
-      scalingFactor = Resources.divide(rc, tot_guarant,
-          totalPreemptionAllowed, totPreemptionNeeded);
+    if (Resources.greaterThan(rc,
+        tot_guarant, totPreemptionNeeded, totalPreemptionAllowed)) {
+      scalingFactor = Resources.divide(rc, tot_guarant, totalPreemptionAllowed,
+          totPreemptionNeeded);
     }
 
     // assign to each queue the amount of actual preemption based on local
@@ -278,12 +301,6 @@ public class PreemptableResourceCalculator {
     for (TempQueuePerPartition t : queues) {
       t.assignPreemption(scalingFactor, rc, tot_guarant);
     }
-    if (LOG.isDebugEnabled()) {
-      for (TempQueuePerPartition t : queues) {
-        LOG.debug(t);
-      }
-    }
-
   }
 
   /**
@@ -329,12 +346,31 @@ public class PreemptableResourceCalculator {
       for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
         // we act only if we are violating balance by more than
         // maxIgnoredOverCapacity
-        if (Resources.greaterThan(rc, clusterResource, qT.current,
-            Resources.multiply(qT.guaranteed, 1.0 + context.getMaxIgnoreOverCapacity()))) {
-          // we introduce a dampening factor naturalTerminationFactor that
-          // accounts for natural termination of containers
-          Resource resToObtain = Resources.multiply(qT.toBePreempted,
-              context.getNaturalTerminationFactor());
+        if (Resources.greaterThan(rc, clusterResource,
+            qT.getUsed(), Resources
+                .multiply(qT.getGuaranteed(),
+                    1.0 + context.getMaxIgnoreOverCapacity()))) {
+          /*
+           * We introduce a dampening factor naturalTerminationFactor that
+           * accounts for natural termination of containers.
+           *
+           * This is added to control pace of preemption, let's say:
+           * If preemption policy calculated a queue *should be* preempted 20 GB
+           * And the nature_termination_factor set to 0.1. As a result, preemption
+           * policy will select 20 GB * 0.1 = 2GB containers to be preempted.
+           *
+           * However, it doesn't work for YARN-4390:
+           * For example, if a queue needs to be preempted 20GB for *one single*
+           * large container, preempt 10% of such resource isn't useful.
+           * So to make it simple, only apply nature_termination_factor when
+           * selector is not reservedPreemptionCandidatesSelector.
+           */
+          Resource resToObtain = qT.toBePreempted;
+          if (!isReservedPreemptionCandidatesSelector) {
+            resToObtain = Resources.multiply(qT.toBePreempted,
+                context.getNaturalTerminationFactor());
+          }
+
           // Only add resToObtain when it >= 0
           if (Resources.greaterThan(rc, clusterResource, resToObtain,
               Resources.none())) {
@@ -343,22 +379,39 @@ public class PreemptableResourceCalculator {
                   + " resource-to-obtain=" + resToObtain);
             }
           }
-          qT.actuallyToBePreempted = Resources.clone(resToObtain);
+          qT.setActuallyToBePreempted(Resources.clone(resToObtain));
         } else {
-          qT.actuallyToBePreempted = Resources.none();
+          qT.setActuallyToBePreempted(Resources.none());
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(qT);
         }
       }
     }
   }
 
+  private void updatePreemptableExtras(TempQueuePerPartition cur) {
+    if (cur.children == null || cur.children.isEmpty()) {
+      cur.updatePreemptableExtras(rc);
+    } else {
+      for (TempQueuePerPartition child : cur.children) {
+        updatePreemptableExtras(child);
+      }
+      cur.updatePreemptableExtras(rc);
+    }
+  }
+
   public void computeIdealAllocation(Resource clusterResource,
       Resource totalPreemptionAllowed) {
     for (String partition : context.getAllPartitions()) {
-      TempQueuePerPartition tRoot =
-          context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
+      TempQueuePerPartition tRoot = context.getQueueByPartition(
+          CapacitySchedulerConfiguration.ROOT, partition);
+      updatePreemptableExtras(tRoot);
+
       // compute the ideal distribution of resources among queues
       // updates cloned queues state accordingly
-      tRoot.idealAssigned = tRoot.guaranteed;
+      tRoot.idealAssigned = tRoot.getGuaranteed();
       recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 5de64b1..bf19f5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.IOException;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -173,6 +174,15 @@ public class ProportionalCapacityPreemptionPolicy
     rc = scheduler.getResourceCalculator();
     nlm = scheduler.getRMContext().getNodeLabelManager();
 
+    // Do we need to specially consider reserved containers?
+    boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+        CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
+    if (selectCandidatesForResevedContainers) {
+      candidatesSelectionPolicies.add(
+          new ReservedContainerCandidatesSelector(this));
+    }
+
     // initialize candidates preemption selection policies
     candidatesSelectionPolicies.add(
         new FifoCandidatesSelector(this));
@@ -185,9 +195,15 @@ public class ProportionalCapacityPreemptionPolicy
 
   @Override
   public void editSchedule() {
+    long startTs = clock.getTime();
+
     CSQueue root = scheduler.getRootQueue();
     Resource clusterResources = Resources.clone(scheduler.getClusterResource());
     containerBasedPreemptOrKill(root, clusterResources);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -295,8 +311,8 @@ public class ProportionalCapacityPreemptionPolicy
       queueToPartitions.clear();
 
       for (String partitionToLookAt : allPartitions) {
-        cloneQueues(root,
-            nlm.getResourceByLabel(partitionToLookAt, clusterResources),
+        cloneQueues(root, Resources
+                .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
             partitionToLookAt);
       }
     }
@@ -311,9 +327,15 @@ public class ProportionalCapacityPreemptionPolicy
 
     // based on ideal allocation select containers to be preemptionCandidates from each
     // queue and each application
-    Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = null;
+    Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
+        new HashMap<>();
     for (PreemptionCandidatesSelector selector :
         candidatesSelectionPolicies) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(MessageFormat
+            .format("Trying to use {0} to select preemption candidates",
+                selector.getClass().getName()));
+      }
       toPreempt = selector.selectCandidates(toPreempt,
           clusterResources, totalPreemptionAllowed);
     }
@@ -377,14 +399,15 @@ public class ProportionalCapacityPreemptionPolicy
       float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
       boolean preemptionDisabled = curQueue.getPreemptionDisabled();
 
-      Resource current = curQueue.getQueueResourceUsage().getUsed(
-          partitionToLookAt);
-      Resource guaranteed = Resources.multiply(partitionResource, absCap);
-      Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
+      Resource current = Resources.clone(
+          curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
       Resource killable = Resources.none();
+
+      Resource reserved = Resources.clone(
+          curQueue.getQueueResourceUsage().getReserved(partitionToLookAt));
       if (null != preemptableQueues.get(queueName)) {
-         killable = preemptableQueues.get(queueName)
-            .getKillableResource(partitionToLookAt);
+        killable = Resources.clone(preemptableQueues.get(queueName)
+            .getKillableResource(partitionToLookAt));
       }
 
       // when partition is a non-exclusive partition, the actual maxCapacity
@@ -392,53 +415,24 @@ public class ProportionalCapacityPreemptionPolicy
       try {
         if (!scheduler.getRMContext().getNodeLabelManager()
             .isExclusiveNodeLabel(partitionToLookAt)) {
-          maxCapacity =
-              Resources.max(rc, partitionResource, maxCapacity, current);
+          absMaxCap = 1.0f;
         }
       } catch (IOException e) {
         // This may cause by partition removed when running capacity monitor,
         // just ignore the error, this will be corrected when doing next check.
       }
 
-      Resource extra = Resource.newInstance(0, 0);
-      if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) {
-        extra = Resources.subtract(current, guaranteed);
-      }
-      if (curQueue instanceof LeafQueue) {
-        LeafQueue l = (LeafQueue) curQueue;
-        Resource pending =
-              l.getTotalPendingResourcesConsideringUserLimit(
-                  partitionResource, partitionToLookAt);
-        ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
-            maxCapacity, preemptionDisabled, partitionToLookAt, killable);
-        if (preemptionDisabled) {
-          ret.untouchableExtra = extra;
-        } else {
-          ret.preemptableExtra = extra;
-        }
-        ret.setLeafQueue(l);
-      } else {
-        Resource pending = Resource.newInstance(0, 0);
-        ret =
-            new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
-                guaranteed, maxCapacity, false, partitionToLookAt, killable);
-        Resource childrensPreemptable = Resource.newInstance(0, 0);
+      ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
+          partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
+          reserved, curQueue);
+
+      if (curQueue instanceof ParentQueue) {
+        // Recursively add children
         for (CSQueue c : curQueue.getChildQueues()) {
-          TempQueuePerPartition subq =
-              cloneQueues(c, partitionResource, partitionToLookAt);
-          Resources.addTo(childrensPreemptable, subq.preemptableExtra);
+          TempQueuePerPartition subq = cloneQueues(c, partitionResource,
+              partitionToLookAt);
           ret.addChild(subq);
         }
-        // untouchableExtra = max(extra - childrenPreemptable, 0)
-        if (Resources.greaterThanOrEqual(
-              rc, partitionResource, childrensPreemptable, extra)) {
-          ret.untouchableExtra = Resource.newInstance(0, 0);
-        } else {
-          ret.untouchableExtra =
-                Resources.subtract(extra, childrensPreemptable);
-        }
-        ret.preemptableExtra = Resources.min(
-            rc, partitionResource, childrensPreemptable, extra);
       }
     }
     addTempQueuePartition(ret);
@@ -481,7 +475,8 @@ public class ProportionalCapacityPreemptionPolicy
       String partition) {
     Map<String, TempQueuePerPartition> partitionToQueues;
     if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
-      return null;
+      throw new YarnRuntimeException("This shouldn't happen, cannot find "
+          + "TempQueuePerPartition for queueName=" + queueName);
     }
     return partitionToQueues.get(partition);
   }
@@ -492,7 +487,8 @@ public class ProportionalCapacityPreemptionPolicy
   @Override
   public Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
     if (!queueToPartitions.containsKey(queueName)) {
-      return null;
+      throw new YarnRuntimeException("This shouldn't happen, cannot find "
+          + "TempQueuePerPartition collection for queueName=" + queueName);
     }
     return queueToPartitions.get(queueName).values();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
new file mode 100644
index 0000000..3900a35
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReservedContainerCandidatesSelector
+    extends PreemptionCandidatesSelector {
+  private static final Log LOG =
+      LogFactory.getLog(ReservedContainerCandidatesSelector.class);
+
+  private PreemptableResourceCalculator preemptableAmountCalculator;
+
+  /**
+   * A temporary data structure to remember what to preempt on a node
+   */
+  private static class NodeForPreemption {
+    private float preemptionCost;
+    private FiCaSchedulerNode schedulerNode;
+    private List<RMContainer> selectedContainers;
+
+    public NodeForPreemption(float preemptionCost,
+        FiCaSchedulerNode schedulerNode, List<RMContainer> selectedContainers) {
+      this.preemptionCost = preemptionCost;
+      this.schedulerNode = schedulerNode;
+      this.selectedContainers = selectedContainers;
+    }
+  }
+
+  ReservedContainerCandidatesSelector(
+      CapacitySchedulerPreemptionContext preemptionContext) {
+    super(preemptionContext);
+    preemptableAmountCalculator = new PreemptableResourceCalculator(
+        preemptionContext, true);
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource clusterResource,
+      Resource totalPreemptedResourceAllowed) {
+    // Calculate how much resources we need to preempt
+    preemptableAmountCalculator.computeIdealAllocation(clusterResource,
+        totalPreemptedResourceAllowed);
+
+    // Get queue to preemptable resource by partition
+    Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition =
+        new HashMap<>();
+    for (String leafQueue : preemptionContext.getLeafQueueNames()) {
+      queueToPreemptableResourceByPartition.put(leafQueue,
+          CapacitySchedulerPreemptionUtils
+              .getResToObtainByPartitionForLeafQueue(preemptionContext,
+                  leafQueue, clusterResource));
+    }
+
+    // Get list of nodes for preemption, ordered by preemption cost
+    List<NodeForPreemption> nodesForPreemption = getNodesForPreemption(
+        clusterResource, queueToPreemptableResourceByPartition,
+        selectedCandidates, totalPreemptedResourceAllowed);
+
+    for (NodeForPreemption nfp : nodesForPreemption) {
+      RMContainer reservedContainer = nfp.schedulerNode.getReservedContainer();
+      if (null == reservedContainer) {
+        continue;
+      }
+
+      NodeForPreemption preemptionResult = getPreemptionCandidatesOnNode(
+          nfp.schedulerNode, clusterResource,
+          queueToPreemptableResourceByPartition, selectedCandidates,
+          totalPreemptedResourceAllowed, false);
+      if (null != preemptionResult) {
+        for (RMContainer c : preemptionResult.selectedContainers) {
+          ApplicationAttemptId appId = c.getApplicationAttemptId();
+          Set<RMContainer> containers = selectedCandidates.get(appId);
+          if (null == containers) {
+            containers = new HashSet<>();
+            selectedCandidates.put(appId, containers);
+          }
+
+          containers.add(c);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this.getClass().getName() + " Marked container=" + c
+                .getContainerId() + " from queue=" + c.getQueueName()
+                + " to be preemption candidates");
+          }
+        }
+      }
+    }
+
+    return selectedCandidates;
+  }
+
+  private Resource getPreemptableResource(String queueName,
+      String partitionName,
+      Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition) {
+    Map<String, Resource> partitionToPreemptable =
+        queueToPreemptableResourceByPartition.get(queueName);
+    if (null == partitionToPreemptable) {
+      return null;
+    }
+
+    Resource preemptable = partitionToPreemptable.get(partitionName);
+    return preemptable;
+  }
+
+  private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
+      String partitionName,
+      Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
+      Resource required, Resource totalPreemptionAllowed, boolean readOnly) {
+    Resource preemptable = getPreemptableResource(queueName, partitionName,
+        queueToPreemptableResourceByPartition);
+    if (null == preemptable) {
+      return false;
+    }
+
+    if (!Resources.fitsIn(rc, cluster, required, preemptable)) {
+      return false;
+    }
+
+    if (!Resources.fitsIn(rc, cluster, required, totalPreemptionAllowed)) {
+      return false;
+    }
+
+    if (!readOnly) {
+      Resources.subtractFrom(preemptable, required);
+      Resources.subtractFrom(totalPreemptionAllowed, required);
+    }
+    return true;
+  }
+
+
+
+  /**
+   * Try to check if we can preempt resources for reserved container in given node
+   * @param node
+   * @param cluster
+   * @param queueToPreemptableResourceByPartition it's a map of
+   *                 <queueName, <partition, preemptable-resource>>
+   * @param readOnly do we want to modify preemptable resource after we selected
+   *                 candidates
+   * @return NodeForPreemption if it's possible to preempt containers on the node
+   * to satisfy reserved resource
+   */
+  private NodeForPreemption getPreemptionCandidatesOnNode(
+      FiCaSchedulerNode node, Resource cluster,
+      Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource totalPreemptionAllowed, boolean readOnly) {
+    RMContainer reservedContainer = node.getReservedContainer();
+    Resource available = Resources.clone(node.getAvailableResource());
+    Resource totalSelected = Resources.createResource(0);
+    List<RMContainer> sortedRunningContainers =
+        node.getCopiedListOfRunningContainers();
+    List<RMContainer> selectedContainers = new ArrayList<>();
+    Map<ContainerId, RMContainer> killableContainers =
+        node.getKillableContainers();
+
+    // Sort running container by launch time, we preferred to preempt recent
+    // launched preempt container
+    Collections.sort(sortedRunningContainers, new Comparator<RMContainer>() {
+      @Override public int compare(RMContainer o1, RMContainer o2) {
+        return -1 * o1.getContainerId().compareTo(o2.getContainerId());
+      }
+    });
+
+    // First check: can we preempt containers to allocate the
+    // reservedContainer?
+    boolean canAllocateReservedContainer = false;
+
+    // At least, we can get available + killable resources from this node
+    Resource cur = Resources.add(available, node.getTotalKillableResources());
+    String partition = node.getPartition();
+
+    // Avoid preempt any container if required <= available + killable
+    if (Resources.fitsIn(rc, cluster, reservedContainer.getReservedResource(),
+        cur)) {
+      return null;
+    }
+
+    // Extra cost of am container preemption
+    float amPreemptionCost = 0f;
+
+    for (RMContainer c : sortedRunningContainers) {
+      String containerQueueName = c.getQueueName();
+
+      // Skip container if it is already marked killable
+      if (killableContainers.containsKey(c.getContainerId())) {
+        continue;
+      }
+
+      // An alternative approach is add a "penalty cost" if AM container is
+      // selected. Here for safety, avoid preempt AM container in any cases
+      if (c.isAMContainer()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip selecting AM container on host=" + node.getNodeID()
+              + " AM container=" + c.getContainerId());
+        }
+        continue;
+      }
+
+      // Can we preempt container c?
+      // Check if we have quota to preempt this container
+      boolean canPreempt = tryToPreemptFromQueue(cluster, containerQueueName,
+          partition, queueToPreemptableResourceByPartition,
+          c.getAllocatedResource(), totalPreemptionAllowed, readOnly);
+
+      // If we can, add to selected container, and change resource accordingly.
+      if (canPreempt) {
+        if (!CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+            selectedCandidates)) {
+          if (!readOnly) {
+            selectedContainers.add(c);
+          }
+          Resources.addTo(totalSelected, c.getAllocatedResource());
+        }
+        Resources.addTo(cur, c.getAllocatedResource());
+        if (Resources.fitsIn(rc, cluster,
+            reservedContainer.getReservedResource(), cur)) {
+          canAllocateReservedContainer = true;
+          break;
+        }
+      }
+    }
+
+    if (!canAllocateReservedContainer) {
+      if (!readOnly) {
+        // Revert queue preemption quotas
+        for (RMContainer c : selectedContainers) {
+          Resource res = getPreemptableResource(c.getQueueName(), partition,
+              queueToPreemptableResourceByPartition);
+          if (null == res) {
+            // This shouldn't happen in normal cases, one possible cause is
+            // container moved to different queue while executing preemption logic.
+            // Ignore such failures.
+            continue;
+          }
+          Resources.addTo(res, c.getAllocatedResource());
+        }
+      }
+      return null;
+    }
+
+    float ratio = Resources.ratio(rc, totalSelected,
+        reservedContainer.getReservedResource());
+
+    // Compute preemption score
+    NodeForPreemption nfp = new NodeForPreemption(ratio + amPreemptionCost,
+        node, selectedContainers);
+
+    return nfp;
+  }
+
+  private List<NodeForPreemption> getNodesForPreemption(Resource cluster,
+      Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource totalPreemptionAllowed) {
+    List<NodeForPreemption> nfps = new ArrayList<>();
+
+    // get nodes have reserved container
+    for (FiCaSchedulerNode node : preemptionContext.getScheduler()
+        .getAllNodes().values()) {
+      if (node.getReservedContainer() != null) {
+        NodeForPreemption nfp = getPreemptionCandidatesOnNode(node, cluster,
+            queueToPreemptableResourceByPartition, selectedCandidates,
+            totalPreemptionAllowed, true);
+        if (null != nfp) {
+          // Null means we cannot preempt containers on the node to satisfy
+          // reserved container
+          nfps.add(nfp);
+        }
+      }
+    }
+
+    // Return sorted node-for-preemptions (by cost)
+    Collections.sort(nfps, new Comparator<NodeForPreemption>() {
+      @Override
+      public int compare(NodeForPreemption o1, NodeForPreemption o2) {
+        return Float.compare(o1.preemptionCost, o2.preemptionCost);
+      }
+    });
+
+    return nfps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 8b01a73..04ed135 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -32,20 +33,22 @@ import java.util.ArrayList;
 public class TempQueuePerPartition {
   // Following fields are copied from scheduler
   final String queueName;
-  final Resource current;
-  final Resource pending;
-  final Resource guaranteed;
-  final Resource maxCapacity;
-  final Resource killable;
   final String partition;
+  final Resource pending;
+
+  private final Resource current;
+  private final Resource killable;
+  private final Resource reserved;
+  private final float absCapacity;
+  private final float absMaxCapacity;
+  final Resource totalPartitionResource;
 
   // Following fields are setted and used by candidate selection policies
   Resource idealAssigned;
   Resource toBePreempted;
   Resource untouchableExtra;
   Resource preemptableExtra;
-  // For logging purpose
-  Resource actuallyToBePreempted;
+  private Resource actuallyToBePreempted;
 
   double normalizedGuarantee;
 
@@ -53,14 +56,22 @@ public class TempQueuePerPartition {
   LeafQueue leafQueue;
   boolean preemptionDisabled;
 
-  TempQueuePerPartition(String queueName, Resource current, Resource pending,
-      Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
-      String partition, Resource killable) {
+  TempQueuePerPartition(String queueName, Resource current,
+      boolean preemptionDisabled, String partition, Resource killable,
+      float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
+      Resource reserved, CSQueue queue) {
     this.queueName = queueName;
     this.current = current;
-    this.pending = pending;
-    this.guaranteed = guaranteed;
-    this.maxCapacity = maxCapacity;
+
+    if (queue instanceof LeafQueue) {
+      LeafQueue l = (LeafQueue) queue;
+      pending = l.getTotalPendingResourcesConsideringUserLimit(
+          totalPartitionResource, partition);
+      leafQueue = l;
+    } else {
+      pending = Resources.createResource(0);
+    }
+
     this.idealAssigned = Resource.newInstance(0, 0);
     this.actuallyToBePreempted = Resource.newInstance(0, 0);
     this.toBePreempted = Resource.newInstance(0, 0);
@@ -71,6 +82,10 @@ public class TempQueuePerPartition {
     this.preemptionDisabled = preemptionDisabled;
     this.partition = partition;
     this.killable = killable;
+    this.absCapacity = absCapacity;
+    this.absMaxCapacity = absMaxCapacity;
+    this.totalPartitionResource = totalPartitionResource;
+    this.reserved = reserved;
   }
 
   public void setLeafQueue(LeafQueue l) {
@@ -92,31 +107,101 @@ public class TempQueuePerPartition {
     return children;
   }
 
+  public Resource getUsed() {
+    return current;
+  }
+
+  public Resource getUsedDeductReservd() {
+    return Resources.subtract(current, reserved);
+  }
+
   // This function "accepts" all the resources it can (pending) and return
   // the unused ones
   Resource offer(Resource avail, ResourceCalculator rc,
-      Resource clusterResource) {
+      Resource clusterResource, boolean considersReservedResource) {
     Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
-        Resources.subtract(maxCapacity, idealAssigned),
+        Resources.subtract(getMax(), idealAssigned),
         Resource.newInstance(0, 0));
     // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
-    Resource accepted =
-        Resources.min(rc, clusterResource,
-            absMaxCapIdealAssignedDelta,
-            Resources.min(rc, clusterResource, avail, Resources.subtract(
-                Resources.add(current, pending), idealAssigned)));
+    Resource accepted = Resources.min(rc, clusterResource,
+        absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail,
+            Resources
+                /*
+                 * When we're using FifoPreemptionSelector
+                 * (considerReservedResource = false).
+                 *
+                 * We should deduct reserved resource to avoid excessive preemption:
+                 *
+                 * For example, if an under-utilized queue has used = reserved = 20.
+                 * Preemption policy will try to preempt 20 containers
+                 * (which is not satisfied) from different hosts.
+                 *
+                 * In FifoPreemptionSelector, there's no guarantee that preempted
+                 * resource can be used by pending request, so policy will preempt
+                 * resources repeatly.
+                 */
+                .subtract(Resources.add(
+                    (considersReservedResource ? getUsed() :
+                      getUsedDeductReservd()),
+                    pending), idealAssigned)));
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;
   }
 
+  public Resource getGuaranteed() {
+    return Resources.multiply(totalPartitionResource, absCapacity);
+  }
+
+  public Resource getMax() {
+    return Resources.multiply(totalPartitionResource, absMaxCapacity);
+  }
+
+  public void updatePreemptableExtras(ResourceCalculator rc) {
+    // Reset untouchableExtra and preemptableExtra
+    untouchableExtra = Resources.none();
+    preemptableExtra = Resources.none();
+
+    Resource extra = Resources.subtract(getUsed(),
+        getGuaranteed());
+    if (Resources.lessThan(rc, totalPartitionResource, extra,
+        Resources.none())) {
+      extra = Resources.none();
+    }
+
+    if (null == children || children.isEmpty()) {
+      // If it is a leaf queue
+      if (preemptionDisabled) {
+        untouchableExtra = extra;
+      } else {
+        preemptableExtra = extra;
+      }
+    } else {
+      // If it is a parent queue
+      Resource childrensPreemptable = Resource.newInstance(0, 0);
+      for (TempQueuePerPartition child : children) {
+        Resources.addTo(childrensPreemptable, child.preemptableExtra);
+      }
+      // untouchableExtra = max(extra - childrenPreemptable, 0)
+      if (Resources.greaterThanOrEqual(rc, totalPartitionResource,
+          childrensPreemptable, extra)) {
+        untouchableExtra = Resource.newInstance(0, 0);
+      } else {
+        untouchableExtra = Resources.subtract(extra, childrensPreemptable);
+      }
+      preemptableExtra = Resources.min(rc, totalPartitionResource,
+          childrensPreemptable, extra);
+    }
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append(" NAME: " + queueName)
         .append(" CUR: ").append(current)
         .append(" PEN: ").append(pending)
-        .append(" GAR: ").append(guaranteed)
+        .append(" RESERVED: ").append(reserved)
+        .append(" GAR: ").append(getGuaranteed())
         .append(" NORM: ").append(normalizedGuarantee)
         .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
         .append(" IDEAL_PREEMPT: ").append(toBePreempted)
@@ -130,29 +215,60 @@ public class TempQueuePerPartition {
 
   public void assignPreemption(float scalingFactor, ResourceCalculator rc,
       Resource clusterResource) {
-    if (Resources.greaterThan(rc, clusterResource,
-        Resources.subtract(current, killable), idealAssigned)) {
-      toBePreempted = Resources.multiply(Resources
-              .subtract(Resources.subtract(current, killable), idealAssigned),
-          scalingFactor);
+    Resource usedDeductKillable = Resources.subtract(
+        getUsed(), killable);
+    Resource totalResource = Resources.add(getUsed(), pending);
+
+    // The minimum resource that we need to keep for a queue is:
+    // max(idealAssigned, min(used + pending, guaranteed)).
+    //
+    // Doing this because when we calculate ideal allocation doesn't consider
+    // reserved resource, ideal-allocation calculated could be less than
+    // guaranteed and total. We should avoid preempt from a queue if it is already
+    // <= its guaranteed resource.
+    Resource minimumQueueResource = Resources.max(rc, clusterResource,
+        Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
+        idealAssigned);
+
+    if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
+        minimumQueueResource)) {
+      toBePreempted = Resources.multiply(
+          Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor);
     } else {
-      toBePreempted = Resource.newInstance(0, 0);
+      toBePreempted = Resources.none();
+    }
+  }
+
+  public Resource getActuallyToBePreempted() {
+    return actuallyToBePreempted;
+  }
+
+  public void setActuallyToBePreempted(Resource res) {
+    this.actuallyToBePreempted = res;
+  }
+
+  public void deductActuallyToBePreempted(ResourceCalculator rc,
+      Resource cluster, Resource toBeDeduct) {
+    if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) {
+      Resources.subtractFrom(actuallyToBePreempted, toBeDeduct);
     }
+    actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted,
+        Resources.none());
   }
 
   void appendLogString(StringBuilder sb) {
     sb.append(queueName).append(", ")
-        .append(current.getMemory()).append(", ")
+        .append(current.getMemorySize()).append(", ")
         .append(current.getVirtualCores()).append(", ")
-        .append(pending.getMemory()).append(", ")
+        .append(pending.getMemorySize()).append(", ")
         .append(pending.getVirtualCores()).append(", ")
-        .append(guaranteed.getMemory()).append(", ")
-        .append(guaranteed.getVirtualCores()).append(", ")
-        .append(idealAssigned.getMemory()).append(", ")
+        .append(getGuaranteed().getMemorySize()).append(", ")
+        .append(getGuaranteed().getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemorySize()).append(", ")
         .append(idealAssigned.getVirtualCores()).append(", ")
-        .append(toBePreempted.getMemory()).append(", ")
+        .append(toBePreempted.getMemorySize()).append(", ")
         .append(toBePreempted.getVirtualCores() ).append(", ")
-        .append(actuallyToBePreempted.getMemory()).append(", ")
+        .append(actuallyToBePreempted.getMemorySize()).append(", ")
         .append(actuallyToBePreempted.getVirtualCores());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index dfe0886..f37923f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -89,4 +89,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   boolean hasIncreaseReservation();
   
   void cancelIncreaseReservation();
+
+  String getQueueName();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 2d0126b..44589aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -181,6 +181,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   // Only used for container resource increase and decrease. This is the
   // resource to rollback to should container resource increase token expires.
   private Resource lastConfirmedResource;
+  private volatile String queueName;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -817,4 +818,13 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   public void cancelIncreaseReservation() {
     hasIncreaseReservation = false;
   }
+
+  public void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 864c3a8..47c83bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -687,7 +687,7 @@ public abstract class AbstractYarnScheduler
       updateMaximumAllocation(node, false);
 
       // update resource to node
-      node.setTotalResource(newResource);
+      node.updateTotalResource(newResource);
 
       nodes.put(nm.getNodeID(), (N)node);
       updateMaximumAllocation(node, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 17c60d9..13519a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -403,6 +403,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
               node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
       attemptResourceUsage.incReserved(node.getPartition(),
           container.getResource());
+      ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
 
       // Reset the re-reservation count
       resetReReservations(priority);
@@ -750,14 +751,17 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   public synchronized void move(Queue newQueue) {
     QueueMetrics oldMetrics = queue.getMetrics();
     QueueMetrics newMetrics = newQueue.getMetrics();
+    String newQueueName = newQueue.getQueueName();
     String user = getUser();
     for (RMContainer liveContainer : liveContainers.values()) {
       Resource resource = liveContainer.getContainer().getResource();
+      ((RMContainerImpl)liveContainer).setQueueName(newQueueName);
       oldMetrics.releaseResources(user, 1, resource);
       newMetrics.allocateResources(user, 1, resource, false);
     }
     for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
       for (RMContainer reservedContainer : map.values()) {
+        ((RMContainerImpl)reservedContainer).setQueueName(newQueueName);
         Resource resource = reservedContainer.getReservedResource();
         oldMetrics.unreserveResource(user, resource);
         newMetrics.reserveResource(user, resource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 81ddd11..8a3fd8b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -98,12 +98,12 @@ public abstract class SchedulerNode {
    * Set total resources on the node.
    * @param resource total resources on the node.
    */
-  public synchronized void setTotalResource(Resource resource){
+  public synchronized void updateTotalResource(Resource resource){
     this.totalResourceCapability = resource;
     this.availableResource = Resources.subtract(totalResourceCapability,
       this.usedResource);
   }
-  
+
   /**
    * Get the ID of the node which contains both its hostname and port.
    * 
@@ -176,7 +176,7 @@ public abstract class SchedulerNode {
         + " containers, " + getUsedResource() + " used and "
         + getAvailableResource() + " available after allocation");
   }
-  
+
   /**
    * The Scheduler increased container
    */
@@ -184,7 +184,7 @@ public abstract class SchedulerNode {
       Resource deltaResource) {
     changeContainerResource(containerId, deltaResource, true);
   }
-  
+
   /**
    * The Scheduler decreased container
    */
@@ -227,7 +227,8 @@ public abstract class SchedulerNode {
     return false;
   }
 
-  protected synchronized void updateResource(Container container) {
+  protected synchronized void updateResourceForReleasedContainer(
+      Container container) {
     addAvailableResource(container.getResource());
     --numContainers;
   }
@@ -246,7 +247,7 @@ public abstract class SchedulerNode {
 
     /* remove the containers from the nodemanger */
     if (null != launchedContainers.remove(container.getId())) {
-      updateResource(container);
+      updateResourceForReleasedContainer(container);
     }
 
     LOG.info("Released container " + container.getId() + " of capacity "
@@ -303,7 +304,7 @@ public abstract class SchedulerNode {
     return numContainers;
   }
 
-  public synchronized List<RMContainer> getRunningContainers() {
+  public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
     return new ArrayList<RMContainer>(launchedContainers.values());
   }
 
@@ -312,7 +313,7 @@ public abstract class SchedulerNode {
   }
 
   protected synchronized void
-      setReservedContainer(RMContainer reservedContainer) {
+  setReservedContainer(RMContainer reservedContainer) {
     this.reservedContainer = reservedContainer;
   }
 
@@ -330,7 +331,7 @@ public abstract class SchedulerNode {
   public void updateLabels(Set<String> labels) {
     this.labels = labels;
   }
-  
+
   /**
    * Get partition of which the node belongs to, if node-labels of this node is
    * empty or null, it belongs to NO_LABEL partition. And since we only support
@@ -338,7 +339,7 @@ public abstract class SchedulerNode {
    */
   public String getPartition() {
     if (this.labels == null || this.labels.isEmpty()) {
-      return RMNodeLabelsManager.NO_LABEL; 
+      return RMNodeLabelsManager.NO_LABEL;
     } else {
       return this.labels.iterator().next();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5153018..ca4a2fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -1150,7 +1149,7 @@ public class CapacityScheduler extends
     String oldPartition = node.getPartition();
 
     // Update resources of these containers
-    for (RMContainer rmContainer : node.getRunningContainers()) {
+    for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
       FiCaSchedulerApp application =
           getApplicationAttempt(rmContainer.getApplicationAttemptId());
       if (null != application) {
@@ -1506,7 +1505,7 @@ public class CapacityScheduler extends
     }
     
     // Remove running containers
-    List<RMContainer> runningContainers = node.getRunningContainers();
+    List<RMContainer> runningContainers = node.getCopiedListOfRunningContainers();
     for (RMContainer container : runningContainers) {
       super.completedContainer(container,
           SchedulerUtils.createAbnormalContainerStatus(
@@ -1623,9 +1622,9 @@ public class CapacityScheduler extends
   public FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
-  
+ 
   @Lock(Lock.NoLock.class)
-  Map<NodeId, FiCaSchedulerNode> getAllNodes() {
+  public Map<NodeId, FiCaSchedulerNode> getAllNodes() {
     return nodes;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index ab3aa7d..6db5074 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1042,21 +1042,24 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
   }
 
+  private static final String PREEMPTION_CONFIG_PREFIX =
+      "yarn.resourcemanager.monitor.capacity.preemption.";
+
   /** If true, run the policy but do not affect the cluster with preemption and
    * kill events. */
   public static final String PREEMPTION_OBSERVE_ONLY =
-      "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
+      PREEMPTION_CONFIG_PREFIX + "observe_only";
   public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
 
   /** Time in milliseconds between invocations of this policy */
   public static final String PREEMPTION_MONITORING_INTERVAL =
-      "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
+      PREEMPTION_CONFIG_PREFIX + "monitoring_interval";
   public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
 
   /** Time in milliseconds between requesting a preemption from an application
    * and killing the container. */
   public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
-      "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
+      PREEMPTION_CONFIG_PREFIX + "max_wait_before_kill";
   public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
 
   /** Maximum percentage of resources preemptionCandidates in a single round. By
@@ -1064,7 +1067,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
    * reclaimed from the cluster. After computing the total desired preemption,
    * the policy scales it back within this limit. */
   public static final String TOTAL_PREEMPTION_PER_ROUND =
-      "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
+      PREEMPTION_CONFIG_PREFIX + "total_preemption_per_round";
   public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
 
   /** Maximum amount of resources above the target capacity ignored for
@@ -1073,7 +1076,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
    * High values would slow the time to capacity and (absent natural
    * completions) it might prevent convergence to guaranteed capacity. */
   public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
-      "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
+      PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity";
   public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f;
   /**
    * Given a computed preemption target, account for containers naturally
@@ -1083,7 +1086,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
    * will reclaim almost 95% of resources within 5 * {@link
    * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
   public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
-      "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+      PREEMPTION_CONFIG_PREFIX + "natural_termination_factor";
   public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
       0.2f;
+
+  /**
+   * When calculating which containers to be preempted, we will try to preempt
+   * containers for reserved containers first. By default is false.
+   */
+  public static final String PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
+      PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
+  public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
+      false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 595b3fe..38995ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -201,6 +201,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         new RMContainerImpl(container, this.getApplicationAttemptId(),
             node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
             request.getNodeLabelExpression());
+    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
 
     updateAMContainerDiagnostics(AMState.ASSIGNED, null);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 1d0e78a..f90a53c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -142,9 +143,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
   }
 
   @Override
-  protected synchronized void updateResource(
+  protected synchronized void updateResourceForReleasedContainer(
       Container container) {
-    super.updateResource(container);
+    super.updateResourceForReleasedContainer(container);
     if (killableContainers.containsKey(container.getId())) {
       Resources.subtractFrom(totalKillableResources, container.getResource());
       killableContainers.remove(container.getId());
@@ -170,6 +171,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
   }
 
   public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
-    return killableContainers;
+    return Collections.unmodifiableMap(killableContainers);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 4b21f7f..24dcd06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -379,6 +379,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     RMContainer rmContainer = new RMContainerImpl(container, 
         getApplicationAttemptId(), node.getNodeID(),
         appSchedulingInfo.getUser(), rmContext);
+    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
 
     // Add it to allContainers list.
     newlyAllocatedContainers.add(rmContainer);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index d6bc1fd..960339e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -916,7 +916,8 @@ public class FairScheduler extends
     triggerUpdate();
 
     // Remove running containers
-    List<RMContainer> runningContainers = node.getRunningContainers();
+    List<RMContainer> runningContainers =
+        node.getCopiedListOfRunningContainers();
     for (RMContainer container : runningContainers) {
       super.completedContainer(container,
           SchedulerUtils.createAbnormalContainerStatus(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 409a043..664bd4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -937,7 +937,7 @@ public class FifoScheduler extends
       return;
     }
     // Kill running containers
-    for(RMContainer container : node.getRunningContainers()) {
+    for(RMContainer container : node.getCopiedListOfRunningContainers()) {
       super.completedContainer(container,
           SchedulerUtils.createAbnormalContainerStatus(
               container.getContainerId(), 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/4] hadoop git commit: YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan

Posted by ep...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..07d1eef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestProportionalCapacityPreemptionPolicyMockFramework
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+
+  @Test
+  public void testBuilder() throws Exception {
+    /**
+     * Test of test, make sure we build expected mock schedulable objects
+     */
+    String labelsConfig =
+        "=200,true;" + // default partition
+            "red=100,false;" + // partition=red
+            "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red;" + // n1 has partition=red
+            "n2=blue;" + // n2 has partition=blue
+            "n3="; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
+            "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
+            "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
+            "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "a1\t" // app2 in a1
+            + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+            // 50 * ignore-exclusivity (allocated)
+            + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+            // 50 in n2 (allocated)
+            "a2\t" // app3 in a2
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "b\t" // app4 in b
+            + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check queues:
+    // root
+    checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
+    checkPendingResource(cs.getQueue("root"), "", 100);
+    checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("root"), "red", 100);
+    checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("root"), "blue", 200);
+
+    // a
+    checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
+    checkPendingResource(cs.getQueue("a"), "", 100);
+    checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("a"), "blue", 200);
+
+    // a1
+    checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
+    checkPendingResource(cs.getQueue("a1"), "", 100);
+    checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a1"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
+    checkPendingResource(cs.getQueue("a1"), "blue", 0);
+
+    // a2
+    checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
+    checkPendingResource(cs.getQueue("a2"), "", 0);
+    checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a2"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
+    checkPendingResource(cs.getQueue("a2"), "blue", 200);
+
+    // b1
+    checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
+    checkPendingResource(cs.getQueue("b"), "", 0);
+    checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("b"), "red", 100);
+    checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("b"), "blue", 0);
+
+    // Check ignored partitioned containers in queue
+    Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
+        .getIgnoreExclusivityRMContainers().get("blue").size());
+
+    // Check applications
+    Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
+    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
+    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
+
+    // Check #containers
+    FiCaSchedulerApp app1 = getApp("a1", 1);
+    FiCaSchedulerApp app2 = getApp("a1", 2);
+    FiCaSchedulerApp app3 = getApp("a2", 3);
+    FiCaSchedulerApp app4 = getApp("b", 4);
+
+    Assert.assertEquals(50, app1.getLiveContainers().size());
+    checkContainerNodesInApp(app1, 50, "n3");
+
+    Assert.assertEquals(50, app2.getLiveContainers().size());
+    Assert.assertEquals(150, app2.getReservedContainers().size());
+    checkContainerNodesInApp(app2, 200, "n2");
+
+    Assert.assertEquals(50, app3.getLiveContainers().size());
+    checkContainerNodesInApp(app3, 50, "n3");
+
+    Assert.assertEquals(100, app4.getLiveContainers().size());
+    checkContainerNodesInApp(app4, 100, "n1");
+  }
+
+  @Test
+  public void testBuilderWithReservedResource() throws Exception {
+    String labelsConfig =
+        "=200,true;" + // default partition
+            "red=100,false;" + // partition=red
+            "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red;" + // n1 has partition=red
+            "n2=blue;" + // n2 has partition=blue
+            "n3="; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+            "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+            "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+            "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "a1\t" // app2 in a1
+            + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+            // 50 * ignore-exclusivity (allocated)
+            + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+            // 50 in n2 (allocated)
+            "a2\t" // app3 in a2
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "b\t" // app4 in b
+            + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check queues:
+    // root
+    checkReservedResource(cs.getQueue("root"), "", 100);
+    checkReservedResource(cs.getQueue("root"), "red", 90);
+
+    // a
+    checkReservedResource(cs.getQueue("a"), "", 50);
+    checkReservedResource(cs.getQueue("a"), "red", 40);
+
+    // a1
+    checkReservedResource(cs.getQueue("a1"), "", 40);
+    checkReservedResource(cs.getQueue("a1"), "red", 20);
+
+    // b
+    checkReservedResource(cs.getQueue("b"), "", 0);
+    checkReservedResource(cs.getQueue("b"), "red", 0);
+  }
+
+  @Test
+  public void testBuilderWithSpecifiedNodeResources() throws Exception {
+    String labelsConfig =
+        "=200,true;" + // default partition
+            "red=100,false;" + // partition=red
+            "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red res=100;" + // n1 has partition=red
+            "n2=blue;" + // n2 has partition=blue
+            "n3= res=30"; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+            "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+            "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+            "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "a1\t" // app2 in a1
+            + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+            // 50 * ignore-exclusivity (allocated)
+            + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+            // 50 in n2 (allocated)
+            "a2\t" // app3 in a2
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "b\t" // app4 in b
+            + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check host resources
+    Assert.assertEquals(3, this.cs.getAllNodes().size());
+    SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
+    Assert.assertEquals(100, node1.getTotalResource().getMemorySize());
+    Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
+    Assert.assertNull(node1.getReservedContainer());
+
+    SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
+    Assert.assertEquals(0, node2.getTotalResource().getMemorySize());
+    Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
+    Assert.assertNotNull(node2.getReservedContainer());
+
+    SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
+    Assert.assertEquals(30, node3.getTotalResource().getMemorySize());
+    Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
+    Assert.assertNull(node3.getReservedContainer());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index 88216f8..54166c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.junit.After;
@@ -144,7 +145,7 @@ public class TestSchedulerApplicationAttempt {
   private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
       Resource resource) {
     ContainerId containerId = ContainerId.newContainerId(appAttId, id);
-    RMContainer rmContainer = mock(RMContainer.class);
+    RMContainer rmContainer = mock(RMContainerImpl.class);
     Container container = mock(Container.class);
     when(container.getResource()).thenReturn(resource);
     when(container.getNodeId()).thenReturn(nodeId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
new file mode 100644
index 0000000..bd9f615
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CapacitySchedulerPreemptionTestBase {
+
+  final int GB = 1024;
+
+  Configuration conf;
+
+  RMNodeLabelsManager mgr;
+
+  Clock clock;
+
+  @Before
+  void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
+    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+
+    // Set preemption related configurations
+    conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+        0);
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        1.0f);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        1.0f);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        60000L);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(this.conf);
+    clock = mock(Clock.class);
+    when(clock.getTime()).thenReturn(0L);
+  }
+
+  SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
+    ResourceManager.RMActiveServices activeServices = rm.getRMActiveService();
+    SchedulingMonitor mon = null;
+    for (Service service : activeServices.getServices()) {
+      if (service instanceof SchedulingMonitor) {
+        mon = (SchedulingMonitor) service;
+        break;
+      }
+    }
+
+    if (mon != null) {
+      return mon.getSchedulingEditPolicy();
+    }
+    return null;
+  }
+
+  public void waitNumberOfLiveContainersFromApp(FiCaSchedulerApp app,
+      int expected) throws InterruptedException {
+    int waitNum = 0;
+
+    while (waitNum < 10) {
+      System.out.println(app.getLiveContainers().size());
+      if (app.getLiveContainers().size() == expected) {
+        return;
+      }
+      Thread.sleep(100);
+      waitNum++;
+    }
+
+    Assert.fail();
+  }
+
+  public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app,
+      int expected) throws InterruptedException {
+    int waitNum = 0;
+
+    while (waitNum < 10) {
+      System.out.println(app.getReservedContainers().size());
+      if (app.getReservedContainers().size() == expected) {
+        return;
+      }
+      Thread.sleep(100);
+      waitNum++;
+    }
+
+    Assert.fail();
+  }
+
+  public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
+      ApplicationAttemptId appId, int expected) throws InterruptedException {
+    int waitNum = 0;
+
+    while (waitNum < 500) {
+      int total = 0;
+      for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+        if (c.getApplicationAttemptId().equals(appId)) {
+          total++;
+        }
+      }
+      if (total == expected) {
+        return;
+      }
+      Thread.sleep(10);
+      waitNum++;
+    }
+
+    Assert.fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 6c9faf7..925e7f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -3371,7 +3371,7 @@ public class TestCapacityScheduler {
           resourceManager
               .getResourceScheduler()
               .getSchedulerNode(resourceEvent.getNodeId())
-              .setTotalResource(resourceEvent.getResourceOption().getResource());
+              .updateTotalResource(resourceEvent.getResourceOption().getResource());
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
new file mode 100644
index 0000000..e2d21c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerLazyPreemption
+    extends CapacitySchedulerPreemptionTestBase {
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+        true);
+  }
+
+  @Test (timeout = 60000)
+  public void testSimplePreemption() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+     * more resource available.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * Now the cluster is fulfilled.
+     *
+     * 4) app2 asks for another 1G container, system will preempt one container
+     * from app1, and app2 will receive the preempted container
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getAvailableResource().getMemorySize());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(1, killableContainers.size());
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 6 containers, and app2 has 2 containers
+    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersNodeLocalityDelay()
+      throws Exception {
+    /**
+     * Test case: same as testSimplePreemption steps 1-3.
+     *
+     * Step 4: app2 asks for 1G container with locality specified, so it needs
+     * to wait for missed-opportunity before get scheduled.
+     * Check if system waits missed-opportunity before finish killable container
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getAvailableResource().getMemorySize());
+
+    // AM asks for a 1 * GB container with unknown host and unknown rack
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1), ResourceRequest
+        .newInstance(Priority.newInstance(1), "unknownhost",
+            Resources.createResource(1 * GB), 1), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    // Do allocation again, one container will be preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // App1 has 6 containers, and app2 has 2 containers (new container allocated)
+    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersHardNodeLocality()
+      throws Exception {
+    /**
+     * Test case: same as testSimplePreemption steps 1-3.
+     *
+     * Step 4: app2 asks for 1G container with hard locality specified, and
+     *         asked host is not existed
+     * Confirm system doesn't preempt any container.
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getAvailableResource().getMemorySize());
+
+    // AM asks for a 1 * GB container for h3 with hard locality,
+    // h3 doesn't exist in the cluster
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1, true), ResourceRequest
+        .newInstance(Priority.newInstance(1), "h3",
+            Resources.createResource(1 * GB), 1, false), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 1, false)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    // Do allocation again, nothing will be preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // App1 has 7 containers, and app2 has 1 containers (no container allocated)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
+      throws Exception {
+    /**
+     * Test case:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     * Submit applications to two queues, one uses more than the other, so
+     * preemption will happen.
+     *
+     * Check:
+     * 1) Killable containers resources will be excluded from PCPP (no duplicated
+     *    container added to killable list)
+     * 2) When more resources need to be preempted, new containers will be selected
+     *    and killable containers will be considered
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 6 times for node1
+    for (int i = 0; i < 6; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+
+    // Get edit policy and do one update
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+
+    // Check killable containers and to-be-preempted containers in edit policy
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Run edit schedule again, confirm status doesn't changed
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Save current to kill containers
+    Set<ContainerId> previousKillableContainers = new HashSet<>(
+        pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+            .keySet());
+
+    // Update request resource of c from 1 to 2, so we need to preempt
+    // one more container
+    am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
+
+    // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+    // and 1 container in killable map
+    editPolicy.editSchedule();
+    Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
+
+    // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Check if previous killable containers included by new killable containers
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+    Assert.assertTrue(
+        Sets.difference(previousKillableContainers, killableContainers.keySet())
+            .isEmpty());
+  }
+
+  /*
+   * Ignore this test now because it could be a premature optimization
+   */
+  @Ignore
+  @Test (timeout = 60000)
+  public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
+      throws Exception {
+    /**
+     * Test case:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     * Submit applications to two queues, one uses more than the other, so
+     * preemption will happen.
+     *
+     * Check:
+     * 1) Containers will be marked to killable
+     * 2) Cancel resource request
+     * 3) Killable containers will be cancelled from policy and scheduler
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 6 times for node1
+    for (int i = 0; i < 6; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
+
+    // Get edit policy and do one update
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if 3 container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+    // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
+    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+
+    // Call editSchedule once more to make sure still nothing happens
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersUserLimit()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
+     *
+     * 1) Two nodes in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+     * more resource available.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * Now the cluster is fulfilled.
+     *
+     * 4) app2 asks for another 1G container, system will preempt one container
+     * from app1, and app2 will receive the preempted container
+     */
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
+    csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
+    MockRM rm1 = new MockRM(csConf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getAvailableResource().getMemorySize());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if no container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    // No preemption happens
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+    Assert.assertEquals(0, killableContainers.size());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  private Map<ContainerId, RMContainer> waitKillableContainersSize(
+      PreemptionManager pm, String queueName, String partition,
+      int expectedSize) throws InterruptedException {
+    Map<ContainerId, RMContainer> killableContainers =
+        pm.getKillableContainersMap(queueName, partition);
+
+    int wait = 0;
+    // Wait for at most 5 sec (it should be super fast actually)
+    while (expectedSize != killableContainers.size() && wait < 500) {
+      killableContainers = pm.getKillableContainersMap(queueName, partition);
+      Thread.sleep(10);
+      wait++;
+    }
+
+    Assert.assertEquals(expectedSize, killableContainers.size());
+    return killableContainers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
deleted file mode 100644
index 001899d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
+++ /dev/null
@@ -1,683 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestCapacitySchedulerPreemption {
-  private static final Log LOG = LogFactory.getLog(
-      TestCapacitySchedulerPreemption.class);
-
-  private final int GB = 1024;
-
-  private Configuration conf;
-
-  RMNodeLabelsManager mgr;
-
-  Clock clock;
-
-  @Before
-  public void setUp() throws Exception {
-    conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
-        ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
-    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
-
-    // Set preemption related configurations
-    conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
-        0);
-    conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
-        true);
-    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
-        1.0f);
-    conf.setFloat(
-        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
-        1.0f);
-    mgr = new NullRMNodeLabelsManager();
-    mgr.init(this.conf);
-    clock = mock(Clock.class);
-    when(clock.getTime()).thenReturn(0L);
-  }
-
-  private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
-    RMActiveServices activeServices = rm.getRMActiveService();
-    SchedulingMonitor mon = null;
-    for (Service service : activeServices.getServices()) {
-      if (service instanceof SchedulingMonitor) {
-        mon = (SchedulingMonitor) service;
-        break;
-      }
-    }
-
-    if (mon != null) {
-      return mon.getSchedulingEditPolicy();
-    }
-    return null;
-  }
-
-  @Test (timeout = 60000)
-  public void testSimplePreemption() throws Exception {
-    /**
-     * Test case: Submit two application (app1/app2) to different queues, queue
-     * structure:
-     *
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     *
-     * 1) Two nodes in the cluster, each of them has 4G.
-     *
-     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
-     * more resource available.
-     *
-     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
-     *
-     * Now the cluster is fulfilled.
-     *
-     * 4) app2 asks for another 1G container, system will preempt one container
-     * from app1, and app2 will receive the preempted container
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getAvailableResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getAvailableResource().getMemory());
-
-    // AM asks for a 1 * GB container
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(1, killableContainers.size());
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 6 containers, and app2 has 2 containers
-    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersNodeLocalityDelay()
-      throws Exception {
-    /**
-     * Test case: same as testSimplePreemption steps 1-3.
-     *
-     * Step 4: app2 asks for 1G container with locality specified, so it needs
-     * to wait for missed-opportunity before get scheduled.
-     * Check if system waits missed-opportunity before finish killable container
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getAvailableResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getAvailableResource().getMemory());
-
-    // AM asks for a 1 * GB container with unknown host and unknown rack
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1), ResourceRequest
-        .newInstance(Priority.newInstance(1), "unknownhost",
-            Resources.createResource(1 * GB), 1), ResourceRequest
-        .newInstance(Priority.newInstance(1), "/default-rack",
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    // Do allocation again, one container will be preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    // App1 has 6 containers, and app2 has 2 containers (new container allocated)
-    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersHardNodeLocality()
-      throws Exception {
-    /**
-     * Test case: same as testSimplePreemption steps 1-3.
-     *
-     * Step 4: app2 asks for 1G container with hard locality specified, and
-     *         asked host is not existed
-     * Confirm system doesn't preempt any container.
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getAvailableResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getAvailableResource().getMemory());
-
-    // AM asks for a 1 * GB container for h3 with hard locality,
-    // h3 doesn't exist in the cluster
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1, true), ResourceRequest
-        .newInstance(Priority.newInstance(1), "h3",
-            Resources.createResource(1 * GB), 1, false), ResourceRequest
-        .newInstance(Priority.newInstance(1), "/default-rack",
-            Resources.createResource(1 * GB), 1, false)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    // Do allocation again, nothing will be preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    // App1 has 7 containers, and app2 has 1 containers (no container allocated)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
-      throws Exception {
-    /**
-     * Test case:
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     * Submit applications to two queues, one uses more than the other, so
-     * preemption will happen.
-     *
-     * Check:
-     * 1) Killable containers resources will be excluded from PCPP (no duplicated
-     *    container added to killable list)
-     * 2) When more resources need to be preempted, new containers will be selected
-     *    and killable containers will be considered
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 6 times for node1
-    for (int i = 0; i < 6; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
-    // NM1 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getAvailableResource().getMemory());
-    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
-
-    // Get edit policy and do one update
-    ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-
-    // Check killable containers and to-be-preempted containers in edit policy
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Run edit schedule again, confirm status doesn't changed
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Save current to kill containers
-    Set<ContainerId> previousKillableContainers = new HashSet<>(
-        pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
-            .keySet());
-
-    // Update request resource of c from 1 to 2, so we need to preempt
-    // one more container
-    am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
-
-    // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
-    // and 1 container in killable map
-    editPolicy.editSchedule();
-    Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
-
-    // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Check if previous killable containers included by new killable containers
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-    Assert.assertTrue(
-        Sets.difference(previousKillableContainers, killableContainers.keySet())
-            .isEmpty());
-  }
-
-  /*
-   * Ignore this test now because it could be a premature optimization
-   */
-  @Ignore
-  @Test (timeout = 60000)
-  public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
-      throws Exception {
-    /**
-     * Test case:
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     * Submit applications to two queues, one uses more than the other, so
-     * preemption will happen.
-     *
-     * Check:
-     * 1) Containers will be marked to killable
-     * 2) Cancel resource request
-     * 3) Killable containers will be cancelled from policy and scheduler
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 6 times for node1
-    for (int i = 0; i < 6; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
-    // NM1 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getAvailableResource().getMemory());
-    am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
-
-    // Get edit policy and do one update
-    ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if 3 container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
-
-    // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
-    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-
-    // Call editSchedule once more to make sure still nothing happens
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersUserLimit()
-      throws Exception {
-    /**
-     * Test case: Submit two application (app1/app2) to different queues, queue
-     * structure:
-     *
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     *
-     * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
-     *
-     * 1) Two nodes in the cluster, each of them has 4G.
-     *
-     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
-     * more resource available.
-     *
-     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
-     *
-     * Now the cluster is fulfilled.
-     *
-     * 4) app2 asks for another 1G container, system will preempt one container
-     * from app1, and app2 will receive the preempted container
-     */
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
-    csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
-    MockRM rm1 = new MockRM(csConf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getAvailableResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getAvailableResource().getMemory());
-
-    // AM asks for a 1 * GB container
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if no container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    // No preemption happens
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
-    Assert.assertEquals(0, killableContainers.size());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  private Map<ContainerId, RMContainer> waitKillableContainersSize(
-      PreemptionManager pm, String queueName, String partition,
-      int expectedSize) throws InterruptedException {
-    Map<ContainerId, RMContainer> killableContainers =
-        pm.getKillableContainersMap(queueName, partition);
-
-    int wait = 0;
-    // Wait for at most 5 sec (it should be super fast actually)
-    while (expectedSize != killableContainers.size() && wait < 500) {
-      killableContainers = pm.getKillableContainersMap(queueName, partition);
-      Thread.sleep(10);
-      wait++;
-    }
-
-    Assert.assertEquals(expectedSize, killableContainers.size());
-    return killableContainers;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org