You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/02/25 01:50:22 UTC

[09/31] hadoop git commit: YARN-4648. Move preemption related tests from TestFairScheduler to TestFairSchedulerPreemption. Contributed by Kai Sasaki.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0e12114c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 6f759ce..5bdcc08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -18,18 +18,31 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,6 +50,11 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -84,7 +102,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     conf = null;
   }
 
-  private void startResourceManager(float utilizationThreshold) {
+  private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) {
     conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
         utilizationThreshold);
     resourceManager = new MockRM(conf);
@@ -98,6 +116,51 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     scheduler.updateInterval = 60 * 1000;
   }
 
+  // YARN-4648: The starting code for ResourceManager mock is originated from
+  // TestFairScheduler. It should be keep as it was to guarantee no changing
+  // behaviour of ResourceManager preemption.
+  private void startResourceManagerWithRealFairScheduler() {
+    scheduler = new FairScheduler();
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+            ResourceScheduler.class);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+    conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+            1024);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
+    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+    conf.setFloat(
+            FairSchedulerConfiguration
+                    .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+            TEST_RESERVATION_THRESHOLD);
+
+    resourceManager = new MockRM(conf);
+
+    // TODO: This test should really be using MockRM. For now starting stuff
+    // that is needed at a bare minimum.
+    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    resourceManager.getRMContext().getStateStore().start();
+
+    // to initialize the master key
+    resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+
+    scheduler.setRMContext(resourceManager.getRMContext());
+  }
+
+  private void stopResourceManager() {
+    if (scheduler != null) {
+      scheduler.stop();
+      scheduler = null;
+    }
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+    QueueMetrics.clearQueueMetrics();
+    DefaultMetricsSystem.shutdown();
+  }
+
   private void registerNodeAndSubmitApp(
       int memory, int vcores, int appContainers, int appMemory) {
     RMNode node1 = MockNodes.newNodeInfo(
@@ -143,7 +206,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     out.println("</allocations>");
     out.close();
 
-    startResourceManager(0f);
+    startResourceManagerWithStubbedFairScheduler(0f);
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
 
@@ -159,7 +222,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
     resourceManager.stop();
 
-    startResourceManager(0.8f);
+    startResourceManagerWithStubbedFairScheduler(0.8f);
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
 
@@ -175,7 +238,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
     resourceManager.stop();
 
-    startResourceManager(0.7f);
+    startResourceManagerWithStubbedFairScheduler(0.7f);
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
 
@@ -189,4 +252,1226 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals("preemptResources() should have been called", 1024,
         ((StubbedFairScheduler) scheduler).lastPreemptMemory);
   }
+
+  @Test (timeout = 5000)
+  /**
+   * Make sure containers are chosen to be preempted in the correct order.
+   */
+  public void testChoiceOfPreemptedContainers() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"default\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create two nodes
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    // Queue A and B each request two applications
+    ApplicationAttemptId app1 =
+            createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+    ApplicationAttemptId app2 =
+            createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
+
+    ApplicationAttemptId app3 =
+            createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
+    ApplicationAttemptId app4 =
+            createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
+
+    scheduler.update();
+
+    scheduler.getQueueManager().getLeafQueue("queueA", true)
+            .setPolicy(SchedulingPolicy.parse("fifo"));
+    scheduler.getQueueManager().getLeafQueue("queueB", true)
+            .setPolicy(SchedulingPolicy.parse("fair"));
+
+    // Sufficient node check-ins to fully schedule containers
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+    for (int i = 0; i < 4; i++) {
+      scheduler.handle(nodeUpdate1);
+      scheduler.handle(nodeUpdate2);
+    }
+
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+    // Now new requests arrive from queueC and default
+    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+    scheduler.update();
+
+    // We should be able to claw back one container from queueA and queueB each.
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // First verify we are adding containers to preemption list for the app.
+    // For queueA (fifo), app2 is selected.
+    // For queueB (fair), app4 is selected.
+    assertTrue("App2 should have container to be preempted",
+            !Collections.disjoint(
+                    scheduler.getSchedulerApp(app2).getLiveContainers(),
+                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App4 should have container to be preempted",
+            !Collections.disjoint(
+                    scheduler.getSchedulerApp(app2).getLiveContainers(),
+                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+
+    // Pretend 15 seconds have passed
+    clock.tickSec(15);
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+    // At this point the containers should have been killed (since we are not simulating AM)
+    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+    // Inside each app, containers are sorted according to their priorities.
+    // Containers with priority 4 are preempted for app2 and app4.
+    Set<RMContainer> set = new HashSet<RMContainer>();
+    for (RMContainer container :
+            scheduler.getSchedulerApp(app2).getLiveContainers()) {
+      if (container.getAllocatedPriority().getPriority() == 4) {
+        set.add(container);
+      }
+    }
+    for (RMContainer container :
+            scheduler.getSchedulerApp(app4).getLiveContainers()) {
+      if (container.getAllocatedPriority().getPriority() == 4) {
+        set.add(container);
+      }
+    }
+    assertTrue("Containers with priority=4 in app2 and app4 should be " +
+            "preempted.", set.isEmpty());
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+    // Pretend 15 seconds have passed
+    clock.tickSec(15);
+
+    // We should be able to claw back another container from A and B each.
+    // For queueA (fifo), continue preempting from app2.
+    // For queueB (fair), even app4 has a lowest priority container with p=4, it
+    // still preempts from app3 as app3 is most over fair share.
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+    // Now A and B are below fair share, so preemption shouldn't do anything
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    assertTrue("App1 should have no container to be preempted",
+            scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
+    assertTrue("App2 should have no container to be preempted",
+            scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
+    assertTrue("App3 should have no container to be preempted",
+            scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
+    assertTrue("App4 should have no container to be preempted",
+            scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
+    stopResourceManager();
+  }
+
+  @Test
+  public void testPreemptionIsNotDelayedToNextRound() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>8</weight>");
+    out.println("<queue name=\"queueA1\" />");
+    out.println("<queue name=\"queueA2\" />");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>2</weight>");
+    out.println("</queue>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node of 8G
+    RMNode node1 = MockNodes.newNodeInfo(1,
+            Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Run apps in queueA.A1 and queueB
+    ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
+            "queueA.queueA1", "user1", 7, 1);
+    // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+    ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
+            "user2", 1, 1);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    for (int i = 0; i < 8; i++) {
+      scheduler.handle(nodeUpdate1);
+    }
+
+    // verify if the apps got the containers they requested
+    assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+
+    // Now submit an app in queueA.queueA2
+    ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
+            "queueA.queueA2", "user3", 7, 1);
+    scheduler.update();
+
+    // Let 11 sec pass
+    clock.tickSec(11);
+
+    scheduler.update();
+    Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
+            .getLeafQueue("queueA.queueA2", false), clock.getTime());
+    assertEquals(3277, toPreempt.getMemory());
+
+    // verify if the 3 containers required by queueA2 are preempted in the same
+    // round
+    scheduler.preemptResources(toPreempt);
+    assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
+            .size());
+    stopResourceManager();
+  }
+
+  @Test (timeout = 5000)
+  /**
+   * Tests the timing of decision to preempt tasks.
+   */
+  public void testPreemptionDecision() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
+                    "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    // Queue A and B each request three containers
+    ApplicationAttemptId app1 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+    ApplicationAttemptId app2 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+    ApplicationAttemptId app3 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+    ApplicationAttemptId app4 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+    ApplicationAttemptId app5 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+    ApplicationAttemptId app6 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 2; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+    }
+
+    // Now new requests arrive from queues C and D
+    ApplicationAttemptId app7 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+    ApplicationAttemptId app8 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+    ApplicationAttemptId app9 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+    ApplicationAttemptId app10 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
+    ApplicationAttemptId app11 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
+    ApplicationAttemptId app12 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
+
+    scheduler.update();
+
+    FSLeafQueue schedC =
+            scheduler.getQueueManager().getLeafQueue("queueC", true);
+    FSLeafQueue schedD =
+            scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
+    // After minSharePreemptionTime has passed, they should want to preempt min
+    // share.
+    clock.tickSec(6);
+    assertEquals(
+            1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
+    assertEquals(
+            1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+
+    // After fairSharePreemptionTime has passed, they should want to preempt
+    // fair share.
+    scheduler.update();
+    clock.tickSec(6);
+    assertEquals(
+            1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
+    assertEquals(
+            1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+    stopResourceManager();
+  }
+
+  @Test
+/**
+ * Tests the timing of decision to preempt tasks.
+ */
+  public void testPreemptionDecisionWithDRF() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,1vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,2vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,3vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,2vcores</minResources>");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
+                    "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    // Queue A and B each request three containers
+    ApplicationAttemptId app1 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+    ApplicationAttemptId app2 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+    ApplicationAttemptId app3 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+    ApplicationAttemptId app4 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+    ApplicationAttemptId app5 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+    ApplicationAttemptId app6 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 2; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+    }
+
+    // Now new requests arrive from queues C and D
+    ApplicationAttemptId app7 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+    ApplicationAttemptId app8 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+    ApplicationAttemptId app9 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+    ApplicationAttemptId app10 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
+    ApplicationAttemptId app11 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
+    ApplicationAttemptId app12 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
+
+    scheduler.update();
+
+    FSLeafQueue schedC =
+            scheduler.getQueueManager().getLeafQueue("queueC", true);
+    FSLeafQueue schedD =
+            scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
+
+    // Test :
+    // 1) whether componentWise min works as expected.
+    // 2) DRF calculator is used
+
+    // After minSharePreemptionTime has passed, they should want to preempt min
+    // share.
+    clock.tickSec(6);
+    Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
+    assertEquals(1024, res.getMemory());
+    // Demand = 3
+    assertEquals(3, res.getVirtualCores());
+
+    res = scheduler.resourceDeficit(schedD, clock.getTime());
+    assertEquals(1024, res.getMemory());
+    // Demand = 6, but min share = 2
+    assertEquals(2, res.getVirtualCores());
+
+    // After fairSharePreemptionTime has passed, they should want to preempt
+    // fair share.
+    scheduler.update();
+    clock.tickSec(6);
+    res = scheduler.resourceDeficit(schedC, clock.getTime());
+    assertEquals(1536, res.getMemory());
+    assertEquals(3, res.getVirtualCores());
+
+    res = scheduler.resourceDeficit(schedD, clock.getTime());
+    assertEquals(1536, res.getMemory());
+    // Demand = 6, but fair share = 3
+    assertEquals(3, res.getVirtualCores());
+    stopResourceManager();
+  }
+
+  @Test
+  /**
+   * Tests the various timing of decision to preempt tasks.
+   */
+  public void testPreemptionDecisionWithVariousTimeout() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>1</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>2</weight>");
+    out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
+    out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
+    out.println("<queue name=\"queueB1\">");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>1</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
+    out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Check the min/fair share preemption timeout for each queue
+    QueueManager queueMgr = scheduler.getQueueManager();
+    assertEquals(30000, queueMgr.getQueue("root")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("default")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueA")
+            .getFairSharePreemptionTimeout());
+    assertEquals(25000, queueMgr.getQueue("queueB")
+            .getFairSharePreemptionTimeout());
+    assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
+            .getFairSharePreemptionTimeout());
+    assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueC")
+            .getFairSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("root")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("default")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueA")
+            .getMinSharePreemptionTimeout());
+    assertEquals(10000, queueMgr.getQueue("queueB")
+            .getMinSharePreemptionTimeout());
+    assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+            .getMinSharePreemptionTimeout());
+    assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueC")
+            .getMinSharePreemptionTimeout());
+
+    // Create one big node
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Queue A takes all resources
+    for (int i = 0; i < 6; i ++) {
+      createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+    }
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    for (int i = 0; i < 6; i++) {
+      scheduler.handle(nodeUpdate1);
+    }
+
+    // Now new requests arrive from queues B1, B2 and C
+    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
+    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
+    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
+    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
+    createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+    createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+    scheduler.update();
+
+    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
+    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
+    FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
+
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
+
+    // After 5 seconds, queueB1 wants to preempt min share
+    scheduler.update();
+    clock.tickSec(6);
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+    assertEquals(
+            0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+    assertEquals(
+            0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+    // After 10 seconds, queueB2 wants to preempt min share
+    scheduler.update();
+    clock.tickSec(5);
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+    assertEquals(
+            0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+    // After 15 seconds, queueC wants to preempt min share
+    scheduler.update();
+    clock.tickSec(5);
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+    // After 20 seconds, queueB2 should want to preempt fair share
+    scheduler.update();
+    clock.tickSec(5);
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+    assertEquals(
+            1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+    // After 25 seconds, queueB1 should want to preempt fair share
+    scheduler.update();
+    clock.tickSec(5);
+    assertEquals(
+            1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+    assertEquals(
+            1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+    assertEquals(
+            1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+    // After 30 seconds, queueC should want to preempt fair share
+    scheduler.update();
+    clock.tickSec(5);
+    assertEquals(
+            1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+    assertEquals(
+            1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+    assertEquals(
+            1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+    stopResourceManager();
+  }
+
+  @Test
+  /**
+   * Tests the decision to preempt tasks respect to non-preemptable queues
+   * 1, Queues as follow:
+   *   queueA(non-preemptable)
+   *   queueB(preemptable)
+   *   parentQueue(non-preemptable)
+   *     --queueC(preemptable)
+   *   queueD(preemptable)
+   * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
+   * 3, Now all resource are occupied
+   * 4, Submit request to queueD, and need to preempt resource from other queues
+   * 5, Only preemptable queue(queueB) would be preempted.
+   */
+  public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"parentQueue\">");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes(3G each)
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+                    "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    RMNode node4 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+                    "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+    scheduler.handle(nodeEvent4);
+
+    // Submit apps to queueA, queueB, queueC,
+    // now all resource of the cluster is occupied
+    ApplicationAttemptId app1 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+    ApplicationAttemptId app2 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
+    ApplicationAttemptId app3 =
+            createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+
+    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // Now new requests arrive from queues D
+    ApplicationAttemptId app4 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
+    scheduler.update();
+    FSLeafQueue schedD =
+            scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+    // After minSharePreemptionTime has passed, 2G resource should preempted from
+    // queueB to queueD
+    clock.tickSec(6);
+    assertEquals(2048,
+            scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    // now only app2 is selected to be preempted
+    assertTrue("App2 should have container to be preempted",
+            !Collections.disjoint(
+                    scheduler.getSchedulerApp(app2).getLiveContainers(),
+                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App1 should not have container to be preempted",
+            Collections.disjoint(
+                    scheduler.getSchedulerApp(app1).getLiveContainers(),
+                    scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+    assertTrue("App3 should not have container to be preempted",
+            Collections.disjoint(
+                    scheduler.getSchedulerApp(app3).getLiveContainers(),
+                    scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+    // Pretend 20 seconds have passed
+    clock.tickSec(20);
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+    // after preemption
+    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+    stopResourceManager();
+  }
+
+  @Test
+  /**
+   * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
+   * all queues.
+   * Then none of them would be preempted actually.
+   * 1, Queues as follow:
+   *   queueA(non-preemptable)
+   *   queueB(non-preemptable)
+   *   parentQueue(non-preemptable)
+   *     --queueC(preemptable)
+   *   parentQueue(preemptable)
+   *     --queueD(non-preemptable)
+   * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
+   * 3, Now all resource are occupied
+   * 4, Submit request to queueA, and need to preempt resource from other queues
+   * 5, None of queues would be preempted.
+   */
+  public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
+          throws Exception {
+    startResourceManagerWithRealFairScheduler();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("<queue name=\"parentQueue1\">");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"parentQueue2\">");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes(3G each)
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+                    "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    RMNode node4 =
+            MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+                    "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+    scheduler.handle(nodeEvent4);
+
+    // Submit apps to queueB, queueC, queueD
+    // now all resource of the cluster is occupied
+
+    ApplicationAttemptId app1 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
+    ApplicationAttemptId app2 =
+            createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
+    ApplicationAttemptId app3 =
+            createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+
+    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // Now new requests arrive from queues A
+    ApplicationAttemptId app4 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+    scheduler.update();
+    FSLeafQueue schedA =
+            scheduler.getQueueManager().getLeafQueue("queueA", true);
+
+    // After minSharePreemptionTime has passed, resource deficit is 2G
+    clock.tickSec(6);
+    assertEquals(2048,
+            scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
+
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    // now none app is selected to be preempted
+    assertTrue("App1 should have container to be preempted",
+            Collections.disjoint(
+                    scheduler.getSchedulerApp(app1).getLiveContainers(),
+                    scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+    assertTrue("App2 should not have container to be preempted",
+            Collections.disjoint(
+                    scheduler.getSchedulerApp(app2).getLiveContainers(),
+                    scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App3 should not have container to be preempted",
+            Collections.disjoint(
+                    scheduler.getSchedulerApp(app3).getLiveContainers(),
+                    scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+    // Pretend 20 seconds have passed
+    clock.tickSec(20);
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+    // after preemption
+    assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+    stopResourceManager();
+  }
+
+  @Test
+  public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<queue name=\"queueB1\">");
+    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("</queue>");
+    out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
+    out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
+    out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Check the min/fair share preemption timeout for each queue
+    QueueManager queueMgr = scheduler.getQueueManager();
+    assertEquals(30000, queueMgr.getQueue("root")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("default")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueA")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
+            .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueC")
+            .getFairSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("root")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("default")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueA")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueB")
+            .getMinSharePreemptionTimeout());
+    assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
+            .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueC")
+            .getMinSharePreemptionTimeout());
+
+    // If both exist, we take the default one
+    out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<queue name=\"queueB1\">");
+    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("</queue>");
+    out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
+    out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
+    out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    assertEquals(25000, queueMgr.getQueue("root")
+            .getFairSharePreemptionTimeout());
+    stopResourceManager();
+  }
+
+  @Test(timeout = 5000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    startResourceManagerWithRealFairScheduler();
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    Priority priority = Priority.newInstance(20);
+    String host = "127.0.0.1";
+    int GB = 1024;
+
+    // Create Node and raised Node Added event
+    RMNode node = MockNodes.newNodeInfo(1,
+            Resources.createResource(16 * 1024, 4), 0, host);
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    // Create 3 container requests and place it in ask
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+            priority.getPriority(), 1, true);
+    ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+            node.getRackName(), priority.getPriority(), 1, true);
+    ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+            ResourceRequest.ANY, priority.getPriority(), 1, true);
+    ask.add(nodeLocalRequest);
+    ask.add(rackLocalRequest);
+    ask.add(offRackRequest);
+
+    // Create Request and update
+    ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+            "user1", ask);
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeUpdate);
+
+    assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+            .size());
+    SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
+
+    // ResourceRequest will be empty once NodeUpdate is completed
+    Assert.assertNull(app.getResourceRequest(priority, host));
+
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    RMContainer rmContainer = app.getRMContainer(containerId1);
+
+    // Create a preempt event and register for preemption
+    scheduler.warnOrKillContainer(rmContainer);
+
+    // Wait for few clock ticks
+    clock.tickSec(5);
+
+    // preempt now
+    scheduler.warnOrKillContainer(rmContainer);
+
+    // Trigger container rescheduled event
+    scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
+            SchedulerEventType.KILL_PREEMPTED_CONTAINER));
+
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    // Once recovered, resource request will be present again in app
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      Assert.assertEquals(1,
+              app.getResourceRequest(priority, request.getResourceName())
+                      .getNumContainers());
+    }
+
+    // Send node heartbeat
+    scheduler.update();
+    scheduler.handle(nodeUpdate);
+
+    List<Container> containers = scheduler.allocate(appAttemptId,
+            Collections.<ResourceRequest> emptyList(),
+            Collections.<ContainerId> emptyList(), null, null, null, null).getContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+    stopResourceManager();
+  }
 }