You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/06/16 02:07:10 UTC

[1/5] hadoop git commit: Y-5182. v1

Repository: hadoop
Updated Branches:
  refs/heads/fs-preemption [created] fe5bf79db


Y-5182. v1


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec5b5ec9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec5b5ec9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec5b5ec9

Branch: refs/heads/fs-preemption
Commit: ec5b5ec90c9966adfa0634562b5b7230da08a109
Parents: 93d8a7f
Author: Karthik Kambatla <ka...@apache.org>
Authored: Mon May 30 23:28:10 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon May 30 23:28:10 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec5b5ec9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index a88abe7..630a8db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -57,8 +57,7 @@ public class MockNodes {
         if (j == (nodesPerRack - 1)) {
           // One unhealthy node per rack.
           list.add(nodeInfo(i, perNode, NodeState.UNHEALTHY));
-        }
-        if (j == 0) {
+        } else if (j == 0) {
           // One node with label
           list.add(nodeInfo(i, perNode, NodeState.RUNNING, ImmutableSet.of("x")));
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/5] hadoop git commit: FS preemption changes WiP

Posted by ka...@apache.org.
FS preemption changes WiP

- Initial set of classes and structure for preemption
- Rename a few files and change starvedApps data structure
- Removed a bunch of preemption code. All tests except TestFairSchedulerPreemption pass.
- Pass checkStarvation down to FSLeafQueue
- Identify starved applications - FSLeafQueue changes
- Identify starved apps - FSAppAttempt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4eec258
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4eec258
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4eec258

Branch: refs/heads/fs-preemption
Commit: e4eec2585333d8bcd77e8b39e2f792358be92831
Parents: d781c25
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sat Mar 5 09:31:28 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue May 31 00:49:56 2016 -0700

----------------------------------------------------------------------
 .../scheduler/fair/FSAppAttempt.java            |  64 ++++-
 .../scheduler/fair/FSContext.java               |  92 +++++++
 .../scheduler/fair/FSLeafQueue.java             | 169 +++++++------
 .../scheduler/fair/FSParentQueue.java           |  11 +-
 .../scheduler/fair/FSPreemptionThread.java      | 152 ++++++++++++
 .../resourcemanager/scheduler/fair/FSQueue.java |  15 +-
 .../scheduler/fair/FairScheduler.java           | 244 +++++--------------
 .../scheduler/fair/QueueManager.java            |  19 +-
 .../scheduler/fair/TestFSLeafQueue.java         |  19 +-
 .../scheduler/fair/TestFSParentQueue.java       |   6 +-
 .../fair/TestFairSchedulerPreemption.java       |  39 +--
 .../fair/TestMaxRunningAppsEnforcer.java        |   7 +-
 .../scheduler/fair/TestQueueManager.java        |   6 +-
 .../webapp/dao/TestFairSchedulerQueueInfo.java  |   2 +-
 14 files changed, 521 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5b83c9a..5065881 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -77,10 +77,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   private ResourceWeights resourceWeights;
   private Resource demand = Resources.createResource(0);
   private FairScheduler scheduler;
+  private FSQueue fsQueue;
   private Resource fairShare = Resources.createResource(0, 0);
-  private Resource preemptedResources = Resources.createResource(0);
   private RMContainerComparator comparator = new RMContainerComparator();
-  private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
+  // Preemption related variables
+  private Resource preemptedResources = Resources.createResource(0);
+  private final Set<RMContainer> containersToPreempt = new HashSet<>();
+  private long lastTimeAtFairShare;
 
   // Used to record node reservation by an app.
   // Key = RackName, Value = Set of Nodes reserved by app on rack
@@ -106,7 +110,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
 
     this.scheduler = scheduler;
+    this.fsQueue = queue;
     this.startTime = scheduler.getClock().getTime();
+    this.lastTimeAtFairShare = this.startTime;
     this.priority = Priority.newInstance(1);
     this.resourceWeights = new ResourceWeights();
   }
@@ -145,6 +151,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     // Remove from the list of containers
     liveContainers.remove(rmContainer.getContainerId());
+    containersToPreempt.remove(rmContainer);
 
     Resource containerResource = rmContainer.getContainer().getResource();
     RMAuditLogger.logSuccess(getUser(), 
@@ -155,9 +162,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     this.attemptResourceUsage.decUsed(containerResource);
 
-    // remove from preemption map if it is completed
-    preemptionMap.remove(rmContainer);
-
     // Clear resource utilization metrics cache.
     lastMemoryAggregateAllocationUpdateTime = -1;
   }
@@ -423,18 +427,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   // related methods
-  public void addPreemption(RMContainer container, long time) {
-    assert preemptionMap.get(container) == null;
-    preemptionMap.put(container, time);
+  public void addPreemption(RMContainer container) {
+    containersToPreempt.add(container);
     Resources.addTo(preemptedResources, container.getAllocatedResource());
   }
 
-  public Long getContainerPreemptionTime(RMContainer container) {
-    return preemptionMap.get(container);
-  }
-
   public Set<RMContainer> getPreemptionContainers() {
-    return preemptionMap.keySet();
+    return containersToPreempt;
   }
   
   @Override
@@ -479,6 +478,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
+   * Reserve a spot on this node for a ResourceRequest that would fit in the
+   * containerSize provided.
+   */
+  public boolean reserve(FSSchedulerNode node, Resource containerSize) {
+    return false;
+  }
+
+  /**
    * Reserve a spot for {@code container} on this {@code node}. If
    * the container is {@code alreadyReserved} on the node, simply
    * update relevant bookeeping. This dispatches ro relevant handlers
@@ -859,6 +866,37 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
   }
 
+  /**
+   * Helper method that computes the extent of fairshare starvation.
+   */
+  Resource fairShareStarvation() {
+    Resource threshold = Resources.multiply(
+        getFairShare(), fsQueue.getFairSharePreemptionThreshold());
+    Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+
+    long now = scheduler.getClock().getTime();
+    boolean starved = Resources.greaterThan(
+        fsQueue.getPolicy().getResourceCalculator(),
+        scheduler.getClusterResource(), starvation, Resources.none());
+
+    if (!starved) {
+      lastTimeAtFairShare = now;
+    }
+
+    if (starved &&
+        (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
+      // Queue is starved for longer than preemption-timeout
+      return starvation;
+    } else {
+      return Resources.none();
+    }
+  }
+
+  public ResourceRequest getNextResourceRequest() {
+    // TODO (KK): Return highest priority resource request
+    return null;
+  }
+
   /* Schedulable methods implementation */
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
new file mode 100644
index 0000000..eccbd2d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Clock;
+
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Helper class that holds basic information to be passed around
+ * FairScheduler classes.
+ */
+public class FSContext {
+  private FairScheduler scheduler;
+  private QueueManager queueManager;
+  private Clock clock;
+
+  // Preemption-related info
+  private boolean preemptionEnabled = false;
+  private float preemptionUtilizationThreshold;
+  private PriorityBlockingQueue<FSAppAttempt> starvedApps;
+
+  public FairScheduler getScheduler() {
+    return scheduler;
+  }
+
+  public void setScheduler(
+      FairScheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+
+  public Resource getClusterResource() {
+    return scheduler.getClusterResource();
+  }
+
+  public QueueManager getQueueManager() {
+    return queueManager;
+  }
+
+  public void setQueueManager(
+      QueueManager queueManager) {
+    this.queueManager = queueManager;
+  }
+
+  public Clock getClock() {
+    return clock;
+  }
+
+  public void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  public boolean isPreemptionEnabled() {
+    return preemptionEnabled;
+  }
+
+  public void setPreemptionEnabled() {
+    this.preemptionEnabled = true;
+    if (starvedApps == null) {
+      starvedApps = new PriorityBlockingQueue<>();
+    }
+  }
+
+  public float getPreemptionUtilizationThreshold() {
+    return preemptionUtilizationThreshold;
+  }
+
+  public void setPreemptionUtilizationThreshold(
+      float preemptionUtilizationThreshold) {
+    this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
+  }
+
+  public PriorityBlockingQueue<FSAppAttempt> getStarvedApps() {
+    return starvedApps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index a398906..bc2a7c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -52,10 +52,9 @@ public class FSLeafQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(
       FSLeafQueue.class.getName());
 
-  private final List<FSAppAttempt> runnableApps = // apps that are runnable
-      new ArrayList<FSAppAttempt>();
-  private final List<FSAppAttempt> nonRunnableApps =
-      new ArrayList<FSAppAttempt>();
+  // apps that are runnable
+  private final List<FSAppAttempt> runnableApps = new ArrayList<>();
+  private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
   // get a lock with fair distribution for app list updates
   private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
   private final Lock readLock = rwl.readLock();
@@ -65,19 +64,16 @@ public class FSLeafQueue extends FSQueue {
   
   // Variables used for preemption
   private long lastTimeAtMinShare;
-  private long lastTimeAtFairShareThreshold;
-  
+
   // Track the AM resource usage for this queue
   private Resource amResourceUsage;
 
   private final ActiveUsersManager activeUsersManager;
   public static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
 
-  public FSLeafQueue(String name, FairScheduler scheduler,
-      FSParentQueue parent) {
-    super(name, scheduler, parent);
-    this.lastTimeAtMinShare = scheduler.getClock().getTime();
-    this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
+  public FSLeafQueue(FSContext context, FSParentQueue parent, String name) {
+    super(context, parent, name);
+    this.lastTimeAtMinShare = context.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
     amResourceUsage = Resource.newInstance(0, 0);
   }
@@ -224,17 +220,70 @@ public class FSLeafQueue extends FSQueue {
     }
     super.policy = policy;
   }
-  
+
   @Override
-  public void recomputeShares() {
+  public void updateInternal(boolean checkStarvation) {
     readLock.lock();
     try {
       policy.computeShares(runnableApps, getFairShare());
+      if (checkStarvation) {
+        identifyStarvedApplications();
+      }
     } finally {
       readLock.unlock();
     }
   }
 
+  /**
+   * Helper method to identify starved applications. This needs to be called
+   * ONLY from {@link #updateInternal}, after the application shares
+   * are updated.
+   *
+   * A queue can be starving due to fairshare or minshare.
+   *
+   * Minshare is defined only on the queue and not the applications.
+   * Fairshare is defined for both the queue and the applications.
+   *
+   * If this queue is starved due to minshare, we need to identify the most
+   * deserving apps if they themselves are not starved due to fairshare.
+   *
+   * If this queue is starving due to fairshare, there must be at least
+   * one application that is starved. And, even if the queue is not
+   * starved due to fairshare, there might still be starved applications.
+   */
+  private void identifyStarvedApplications() {
+    // First identify starved applications and track total amount of
+    // starvation (in resources)
+    Resource fairShareStarvation = Resources.clone(Resources.none());
+    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
+    for (FSAppAttempt app : appsWithDemand) {
+      Resource appStarvation = app.fairShareStarvation();
+      if (Resources.equals(Resources.none(), appStarvation))  {
+        break;
+      } else {
+        context.getStarvedApps().add(app);
+        Resources.addTo(fairShareStarvation, appStarvation);
+      }
+    }
+
+    // Compute extent of minshare starvation
+    Resource minShareStarvation = minShareStarvation();
+
+    // Compute minshare starvation that is not subsumed by fairshare starvation
+    Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+
+    // Keep adding apps to the starved list until the unmet demand goes over
+    // the remaining minshare
+    for (FSAppAttempt app : appsWithDemand) {
+      if (Resources.greaterThan(policy.getResourceCalculator(),
+          context.getClusterResource(), minShareStarvation, Resources.none())) {
+        context.getStarvedApps().add(app);
+        Resources.subtractFrom(minShareStarvation,
+            Resources.subtract(app.getDemand(), app.getResourceUsage()));
+      }
+    }
+  }
+
   @Override
   public Resource getDemand() {
     return demand;
@@ -317,21 +366,7 @@ public class FSLeafQueue extends FSQueue {
       return assigned;
     }
 
-    // Apps that have resource demands.
-    TreeSet<FSAppAttempt> pendingForResourceApps =
-        new TreeSet<FSAppAttempt>(policy.getComparator());
-    readLock.lock();
-    try {
-      for (FSAppAttempt app : runnableApps) {
-        Resource pending = app.getAppAttemptResourceUsage().getPending();
-        if (!pending.equals(Resources.none())) {
-          pendingForResourceApps.add(app);
-        }
-      }
-    } finally {
-      readLock.unlock();
-    }
-    for (FSAppAttempt sched : pendingForResourceApps) {
+    for (FSAppAttempt sched : fetchAppsWithDemand()) {
       if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
         continue;
       }
@@ -347,6 +382,23 @@ public class FSLeafQueue extends FSQueue {
     return assigned;
   }
 
+  private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
+    TreeSet<FSAppAttempt> pendingForResourceApps =
+        new TreeSet<>(policy.getComparator());
+    readLock.lock();
+    try {
+      for (FSAppAttempt app : runnableApps) {
+        Resource pending = app.getAppAttemptResourceUsage().getPending();
+        if (!pending.equals(Resources.none())) {
+          pendingForResourceApps.add(app);
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return pendingForResourceApps;
+  }
+
   @Override
   public RMContainer preemptContainer() {
     RMContainer toBePreempted = null;
@@ -412,15 +464,6 @@ public class FSLeafQueue extends FSQueue {
     this.lastTimeAtMinShare = lastTimeAtMinShare;
   }
 
-  public long getLastTimeAtFairShareThreshold() {
-    return lastTimeAtFairShareThreshold;
-  }
-
-  private void setLastTimeAtFairShareThreshold(
-      long lastTimeAtFairShareThreshold) {
-    this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
-  }
-
   @Override
   public int getNumRunnableApps() {
     readLock.lock();
@@ -525,20 +568,6 @@ public class FSLeafQueue extends FSQueue {
     // TODO Auto-generated method stub
   }
 
-  /**
-   * Update the preemption fields for the queue, i.e. the times since last was
-   * at its guaranteed share and over its fair share threshold.
-   */
-  public void updateStarvationStats() {
-    long now = scheduler.getClock().getTime();
-    if (!isStarvedForMinShare()) {
-      setLastTimeAtMinShare(now);
-    }
-    if (!isStarvedForFairShare()) {
-      setLastTimeAtFairShareThreshold(now);
-    }
-  }
-
   /** Allows setting weight for a dynamically created queue
    * Currently only used for reservation based queues
    * @param weight queue weight
@@ -558,28 +587,24 @@ public class FSLeafQueue extends FSQueue {
         getFairShare());
   }
 
-  /**
-   * Is a queue being starved for its min share.
-   */
-  @VisibleForTesting
-  boolean isStarvedForMinShare() {
-    return isStarved(getMinShare());
-  }
+  private Resource minShareStarvation() {
+    Resource desiredShare = Resources.min(policy.getResourceCalculator(),
+        scheduler.getClusterResource(), getMinShare(), getDemand());
 
-  /**
-   * Is a queue being starved for its fair share threshold.
-   */
-  @VisibleForTesting
-  boolean isStarvedForFairShare() {
-    return isStarved(
-        Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
-  }
+    Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
+    boolean starved = Resources.greaterThan(policy.getResourceCalculator(),
+        scheduler.getClusterResource(), starvation, Resources.none());
 
-  private boolean isStarved(Resource share) {
-    Resource desiredShare = Resources.min(policy.getResourceCalculator(),
-            scheduler.getClusterResource(), share, getDemand());
-    Resource resourceUsage = getResourceUsage();
-    return Resources.lessThan(policy.getResourceCalculator(),
-            scheduler.getClusterResource(), resourceUsage, desiredShare);
+    long now = context.getClock().getTime();
+    if (!starved) {
+      setLastTimeAtMinShare(now);
+    }
+
+    if (starved &&
+        (now - lastTimeAtMinShare > getMinSharePreemptionTimeout())) {
+      return starvation;
+    } else {
+      return Resources.none();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 035c60c..79c6e1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -56,9 +56,8 @@ public class FSParentQueue extends FSQueue {
   private Lock readLock = rwLock.readLock();
   private Lock writeLock = rwLock.writeLock();
 
-  public FSParentQueue(String name, FairScheduler scheduler,
-      FSParentQueue parent) {
-    super(name, scheduler, parent);
+  public FSParentQueue(FSContext context, FSParentQueue parent, String name) {
+    super(context, parent, name);
   }
   
   public void addChildQueue(FSQueue child) {
@@ -80,13 +79,13 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
-  public void recomputeShares() {
+  public void updateInternal(boolean checkStarvation) {
     readLock.lock();
     try {
       policy.computeShares(childQueues, getFairShare());
       for (FSQueue childQueue : childQueues) {
         childQueue.getMetrics().setFairShare(childQueue.getFairShare());
-        childQueue.recomputeShares();
+        childQueue.updateInternal(checkStarvation);
       }
     } finally {
       readLock.unlock();
@@ -304,7 +303,7 @@ public class FSParentQueue extends FSQueue {
     }
     super.policy = policy;
   }
-  
+
   public void incrementRunnableApps() {
     writeLock.lock();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
new file mode 100644
index 0000000..0e99b64
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Thread that handles FairScheduler preemption
+ */
+public class FSPreemptionThread extends Thread {
+  private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
+  private final FSContext context;
+  private final FairScheduler scheduler;
+  private final long warnTimeBeforeKill;
+  private final Timer preemptionTimer;
+
+  public FSPreemptionThread(FSContext context) {
+    this.context = context;
+    this.scheduler = context.getScheduler();
+    FairSchedulerConfiguration fsConf = scheduler.getConf();
+    context.setPreemptionEnabled();
+    context.setPreemptionUtilizationThreshold(
+        fsConf.getPreemptionUtilizationThreshold());
+    warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+    preemptionTimer = new Timer("Preemption Timer", true);
+
+    setDaemon(true);
+    setName("FSPreemptionThread");
+  }
+
+  public void run() {
+    while (!Thread.interrupted()) {
+      FSAppAttempt starvedApp;
+      try{
+        starvedApp = context.getStarvedApps().take();
+      } catch (InterruptedException e) {
+        LOG.info("Preemption thread interrupted! Exiting.");
+        return;
+      }
+      List<RMContainer> containers = identifyContainersToPreempt(starvedApp);
+      if (containers != null) {
+        preemptContainers(containers);
+      }
+    }
+  }
+
+  /**
+   * Returns a non-null PremptionContext if it finds a node that can
+   * accommodate a request from this app. Also, reserves the node for this app.
+   */
+  private List<RMContainer> identifyContainersToPreempt(FSAppAttempt starvedApp) {
+    List<RMContainer> containers = new ArrayList<>();
+    ResourceRequest request = starvedApp.getNextResourceRequest();
+    Resource requestCapability = request.getCapability();
+    List<FSSchedulerNode> nodes =
+        scheduler.getNodeTracker().getNodes(request.getResourceName());
+    FSSchedulerNode targetNode = null;
+    Resource potential = Resources.clone(Resources.none());
+    for (FSSchedulerNode node : nodes) {
+      containers.clear();
+      potential = Resources.clone(Resources.none());
+      for (RMContainer container : node.getCopiedListOfRunningContainers()) {
+        Resource containerResource = container.getAllocatedResource();
+        FSAppAttempt app =
+            scheduler.getSchedulerApp(container.getApplicationAttemptId());
+        if (Resources.fitsIn(containerResource,
+            Resources.subtract(app.getResourceUsage(), app.getFairShare()))) {
+          Resources.addTo(potential, containerResource);
+        }
+        if (Resources.fitsIn(requestCapability, potential)) {
+          break;
+        }
+      }
+      if (Resources.fitsIn(requestCapability, potential)) {
+        targetNode = node;
+        break;
+      }
+    }
+
+    if (Resources.fitsIn(requestCapability, potential)) {
+      starvedApp.reserve(targetNode, requestCapability);
+      return containers;
+    } else {
+      return null;
+    }
+  }
+
+  public void preemptContainers(List<RMContainer> containers) {
+    // Warn application about containers to be killed
+    for (RMContainer container : containers) {
+      ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+      FSAppAttempt app = context.getScheduler().getSchedulerApp(appAttemptId);
+      FSLeafQueue queue = app.getQueue();
+      LOG.info("Preempting container " + container +
+          " from queue " + queue.getName());
+      app.addPreemption(container);
+    }
+
+    // Schedule timer task to kill containers
+    preemptionTimer.schedule(
+        new PreemptContainersTask(containers), warnTimeBeforeKill);
+  }
+
+  private class PreemptContainersTask extends TimerTask {
+    private List<RMContainer> containers;
+
+    PreemptContainersTask(List<RMContainer> containers) {
+      this.containers = containers;
+    }
+
+    @Override
+    public void run() {
+      for (RMContainer container : containers) {
+        ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus(
+            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
+
+        LOG.info("Killing container " + container);
+        context.getScheduler().completedContainer(
+            container, status, RMContainerEventType.KILL);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index f50c358..32184fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -50,6 +50,7 @@ public abstract class FSQueue implements Queue, Schedulable {
   private Resource fairShare = Resources.createResource(0, 0);
   private Resource steadyFairShare = Resources.createResource(0, 0);
   private final String name;
+  protected final FSContext context;
   protected final FairScheduler scheduler;
   private final FSQueueMetrics metrics;
   
@@ -64,9 +65,10 @@ public abstract class FSQueue implements Queue, Schedulable {
   private float fairSharePreemptionThreshold = 0.5f;
   private boolean preemptable = true;
 
-  public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
+  public FSQueue(FSContext context, FSParentQueue parent, String name) {
     this.name = name;
-    this.scheduler = scheduler;
+    this.context = context;
+    this.scheduler = context.getScheduler();
     this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
     metrics.setMinShare(getMinShare());
     metrics.setMaxShare(getMaxShare());
@@ -246,9 +248,14 @@ public abstract class FSQueue implements Queue, Schedulable {
 
   /**
    * Recomputes the shares for all child queues and applications based on this
-   * queue's current share
+   * queue's current share, and checks for starvation.
    */
-  public abstract void recomputeShares();
+  public abstract void updateInternal(boolean checkStarvation);
+
+  public void update(Resource fairShare, boolean checkStarvation) {
+    setFairShare(fairShare);
+    updateInternal(checkStarvation);
+  }
 
   /**
    * Update the min/fair share preemption timeouts, threshold and preemption

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index c8e8406..c3684ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -127,9 +127,9 @@ public class FairScheduler extends
     AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
   private FairSchedulerConfiguration conf;
 
+  private FSContext context;
   private Resource incrAllocation;
   private QueueManager queueMgr;
-  private volatile Clock clock;
   private boolean usePortForNodeName;
 
   private static final Log LOG = LogFactory.getLog(FairScheduler.class);
@@ -155,6 +155,9 @@ public class FairScheduler extends
 
   @VisibleForTesting
   Thread schedulingThread;
+
+  Thread preemptionThread;
+
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
@@ -162,25 +165,6 @@ public class FairScheduler extends
   FSQueueMetrics rootMetrics;
   FSOpDurations fsOpDurations;
 
-  // Time when we last updated preemption vars
-  protected long lastPreemptionUpdateTime;
-  // Time we last ran preemptTasksIfNecessary
-  private long lastPreemptCheckTime;
-
-  // Preemption related variables
-  protected boolean preemptionEnabled;
-  protected float preemptionUtilizationThreshold;
-
-  // How often tasks are preempted
-  protected long preemptionInterval; 
-  
-  // ms to wait before force killing stuff (must be longer than a couple
-  // of heartbeats to give task-kill commands a chance to act).
-  protected long waitTimeBeforeKill; 
-  
-  // Containers whose AMs have been warned that they will be preempted soon.
-  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
-
   private float reservableNodesRatio; // percentage of available nodes
                                       // an app can be reserved on
 
@@ -214,12 +198,24 @@ public class FairScheduler extends
 
   public FairScheduler() {
     super(FairScheduler.class.getName());
-    clock = SystemClock.getInstance();
+
+    context = new FSContext();
+    context.setScheduler(this);
+
+    context.setClock(SystemClock.getInstance());
     allocsLoader = new AllocationFileLoaderService();
-    queueMgr = new QueueManager(this);
+
+    queueMgr = new QueueManager(context);
+    context.setQueueManager(queueMgr);
+
     maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
   }
 
+  @VisibleForTesting
+  public FSContext getContext() {
+    return context;
+  }
+
   public boolean isAtLeastReservationThreshold(
       ResourceCalculator resourceCalculator, Resource resource) {
     return Resources.greaterThanOrEqual(resourceCalculator,
@@ -300,7 +296,6 @@ public class FairScheduler extends
           }
           long start = getClock().getTime();
           update();
-          preemptTasksIfNecessary();
           long duration = getClock().getTime() - start;
           fsOpDurations.addUpdateThreadRunDuration(duration);
         } catch (InterruptedException ie) {
@@ -340,24 +335,22 @@ public class FairScheduler extends
    */
   protected synchronized void update() {
     long start = getClock().getTime();
-    updateStarvationStats(); // Determine if any queues merit preemption
 
     FSQueue rootQueue = queueMgr.getRootQueue();
 
     // Recursively update demands for all queues
     rootQueue.updateDemand();
 
-    Resource clusterResource = getClusterResource();
-    rootQueue.setFairShare(clusterResource);
-    // Recursively compute fair shares for all queues
-    // and update metrics
-    rootQueue.recomputeShares();
+    // Update fairshares and starvation stats.
+    rootQueue.update(getClusterResource(), shouldAttemptPreemption());
+
+    // Update metrics
     updateRootQueueMetrics();
 
     if (LOG.isDebugEnabled()) {
       if (--updatesToSkipForDebug < 0) {
         updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
-        LOG.debug("Cluster Capacity: " + clusterResource +
+        LOG.debug("Cluster Capacity: " + getClusterResource() +
             "  Allocations: " + rootMetrics.getAllocatedResources() +
             "  Availability: " + Resource.newInstance(
             rootMetrics.getAvailableMB(),
@@ -371,144 +364,6 @@ public class FairScheduler extends
   }
 
   /**
-   * Update the preemption fields for all QueueScheduables, i.e. the times since
-   * each queue last was at its guaranteed share and over its fair share
-   * threshold for each type of task.
-   */
-  private void updateStarvationStats() {
-    lastPreemptionUpdateTime = clock.getTime();
-    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      sched.updateStarvationStats();
-    }
-  }
-
-  /**
-   * Check for queues that need tasks preempted, either because they have been
-   * below their guaranteed share for minSharePreemptionTimeout or they have
-   * been below their fair share threshold for the fairSharePreemptionTimeout. If
-   * such queues exist, compute how many tasks of each type need to be preempted
-   * and then select the right ones using preemptTasks.
-   */
-  protected synchronized void preemptTasksIfNecessary() {
-    if (!shouldAttemptPreemption()) {
-      return;
-    }
-
-    long curTime = getClock().getTime();
-    if (curTime - lastPreemptCheckTime < preemptionInterval) {
-      return;
-    }
-    lastPreemptCheckTime = curTime;
-
-    Resource resToPreempt = Resources.clone(Resources.none());
-    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
-    }
-    if (isResourceGreaterThanNone(resToPreempt)) {
-      preemptResources(resToPreempt);
-    }
-  }
-
-  /**
-   * Preempt a quantity of resources. Each round, we start from the root queue,
-   * level-by-level, until choosing a candidate application.
-   * The policy for prioritizing preemption for each queue depends on its
-   * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
-   * most over its fair share; (2) FIFO, choose the childSchedulable that is
-   * latest launched.
-   * Inside each application, we further prioritize preemption by choosing
-   * containers with lowest priority to preempt.
-   * We make sure that no queue is placed below its fair share in the process.
-   */
-  protected void preemptResources(Resource toPreempt) {
-    long start = getClock().getTime();
-    if (Resources.equals(toPreempt, Resources.none())) {
-      return;
-    }
-
-    // Scan down the list of containers we've already warned and kill them
-    // if we need to.  Remove any containers from the list that we don't need
-    // or that are no longer running.
-    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
-    while (warnedIter.hasNext()) {
-      RMContainer container = warnedIter.next();
-      if ((container.getState() == RMContainerState.RUNNING ||
-              container.getState() == RMContainerState.ALLOCATED) &&
-              isResourceGreaterThanNone(toPreempt)) {
-        warnOrKillContainer(container);
-        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
-      } else {
-        warnedIter.remove();
-      }
-    }
-
-    try {
-      // Reset preemptedResource for each app
-      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
-        queue.resetPreemptedResources();
-      }
-
-      while (isResourceGreaterThanNone(toPreempt)) {
-        RMContainer container =
-            getQueueManager().getRootQueue().preemptContainer();
-        if (container == null) {
-          break;
-        } else {
-          warnOrKillContainer(container);
-          warnedContainers.add(container);
-          Resources.subtractFrom(
-              toPreempt, container.getContainer().getResource());
-        }
-      }
-    } finally {
-      // Clear preemptedResources for each app
-      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
-        queue.clearPreemptedResources();
-      }
-    }
-
-    long duration = getClock().getTime() - start;
-    fsOpDurations.addPreemptCallDuration(duration);
-  }
-
-  private boolean isResourceGreaterThanNone(Resource toPreempt) {
-    return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0);
-  }
-
-  protected void warnOrKillContainer(RMContainer container) {
-    ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
-    FSAppAttempt app = getSchedulerApp(appAttemptId);
-    FSLeafQueue queue = app.getQueue();
-    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
-        "res=" + container.getContainer().getResource() +
-        ") from queue " + queue.getName());
-    
-    Long time = app.getContainerPreemptionTime(container);
-
-    if (time != null) {
-      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
-      // proceed with kill
-      if (time + waitTimeBeforeKill < getClock().getTime()) {
-        ContainerStatus status =
-          SchedulerUtils.createPreemptedContainerStatus(
-            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
-
-        // TODO: Not sure if this ever actually adds this to the list of cleanup
-        // containers on the RMNode (see SchedulerNode.releaseContainer()).
-        super.completedContainer(container, status, RMContainerEventType.KILL);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Killing container" + container +
-                  " (after waiting for preemption for " +
-                  (getClock().getTime() - time) + "ms)");
-        }
-      }
-    } else {
-      // track the request in the FSAppAttempt itself
-      app.addPreemption(container, getClock().getTime());
-    }
-  }
-
-  /**
    * Return the resource amount that this queue is allowed to preempt, if any.
    * If the queue has been below its min share for at least its preemption
    * timeout, it should preempt the difference between its current share and
@@ -531,12 +386,12 @@ public class FairScheduler extends
       resDueToMinShare = Resources.max(calc, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
-    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
-      Resource target = Resources.componentwiseMin(
-              sched.getFairShare(), sched.getDemand());
-      resDueToFairShare = Resources.max(calc, clusterResource,
-          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
-    }
+//    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
+//      Resource target = Resources.componentwiseMin(
+//              sched.getFairShare(), sched.getDemand());
+//      resDueToFairShare = Resources.max(calc, clusterResource,
+//          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
+//    }
     Resource deficit = Resources.max(calc, clusterResource,
         resDueToMinShare, resDueToFairShare);
     if (Resources.greaterThan(calc, clusterResource,
@@ -604,12 +459,13 @@ public class FairScheduler extends
   }
 
   public Clock getClock() {
-    return clock;
+    return context.getClock();
   }
 
   @VisibleForTesting
+  @Deprecated // use FSContext.setClock
   void setClock(Clock clock) {
-    this.clock = clock;
+    context.setClock(clock);
   }
 
   public FairSchedulerEventLog getEventLog() {
@@ -1208,15 +1064,22 @@ public class FairScheduler extends
    * Check if preemption is enabled and the utilization threshold for
    * preemption is met.
    *
+   * TODO (KK): Should we handle the case where usage is less than preemption
+   * threshold, but there are applications requesting resources on nodes that
+   * are otherwise occupied by long running applications over their
+   * fairshare? What if they are occupied by applications not over their
+   * fairshare? Does this mean YARN should not allocate all resources on a
+   * node to long-running services?
+   *
    * @return true if preemption should be attempted, false otherwise.
    */
   private boolean shouldAttemptPreemption() {
-    if (preemptionEnabled) {
-      Resource clusterResource = getClusterResource();
-      return (preemptionUtilizationThreshold < Math.max(
-          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(),
+    if (context.isPreemptionEnabled()) {
+      return (context.getPreemptionUtilizationThreshold() < Math.max(
+          (float) rootMetrics.getAllocatedMB() /
+              getClusterResource().getMemorySize(),
           (float) rootMetrics.getAllocatedVirtualCores() /
-              clusterResource.getVirtualCores()));
+              getClusterResource().getVirtualCores()));
     }
     return false;
   }
@@ -1400,15 +1263,10 @@ public class FairScheduler extends
       rackLocalityThreshold = this.conf.getLocalityThresholdRack();
       nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
       rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      preemptionUtilizationThreshold =
-          this.conf.getPreemptionUtilizationThreshold();
       assignMultiple = this.conf.getAssignMultiple();
       maxAssignDynamic = this.conf.isMaxAssignDynamic();
       maxAssign = this.conf.getMaxAssign();
       sizeBasedWeight = this.conf.getSizeBasedWeight();
-      preemptionInterval = this.conf.getPreemptionInterval();
-      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
       usePortForNodeName = this.conf.getUsePortForNodeName();
       reservableNodesRatio = this.conf.getReservableNodes();
 
@@ -1425,8 +1283,7 @@ public class FairScheduler extends
       fsOpDurations = FSOpDurations.getInstance(true);
 
       // This stores per-application scheduling information
-      this.applications = new ConcurrentHashMap<
-          ApplicationId, SchedulerApplication<FSAppAttempt>>();
+      this.applications = new ConcurrentHashMap<>();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
 
@@ -1447,6 +1304,10 @@ public class FairScheduler extends
         schedulingThread.setName("FairSchedulerContinuousScheduling");
         schedulingThread.setDaemon(true);
       }
+
+      if (this.conf.getPreemptionEnabled()) {
+        preemptionThread = new FSPreemptionThread(context);
+      }
     }
 
     allocsLoader.init(conf);
@@ -1477,6 +1338,9 @@ public class FairScheduler extends
       Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
       schedulingThread.start();
     }
+    if (preemptionThread != null) {
+      preemptionThread.start();
+    }
     allocsLoader.start();
   }
 
@@ -1505,6 +1369,10 @@ public class FairScheduler extends
           schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
         }
       }
+      if (preemptionThread != null) {
+        preemptionThread.interrupt();
+        preemptionThread.join(THREAD_JOIN_TIMEOUT_MS);
+      }
       if (allocsLoader != null) {
         allocsLoader.stop();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index aeadcf6..c7d368c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -50,16 +50,16 @@ public class QueueManager {
     QueueManager.class.getName());
 
   public static final String ROOT_QUEUE = "root";
-  
-  private final FairScheduler scheduler;
+
+  private final FSContext context;
 
   private final Collection<FSLeafQueue> leafQueues = 
       new CopyOnWriteArrayList<FSLeafQueue>();
   private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
   private FSParentQueue rootQueue;
 
-  public QueueManager(FairScheduler scheduler) {
-    this.scheduler = scheduler;
+  public QueueManager(FSContext context) {
+    this.context = context;
   }
   
   public FSParentQueue getRootQueue() {
@@ -68,7 +68,7 @@ public class QueueManager {
 
   public void initialize(Configuration conf) throws IOException,
       SAXException, AllocationConfigurationException, ParserConfigurationException {
-    rootQueue = new FSParentQueue("root", scheduler, null);
+    rootQueue = new FSParentQueue(context, null, "root");
     queues.put(rootQueue.getName(), rootQueue);
     
     // Create the default queue
@@ -215,12 +215,13 @@ public class QueueManager {
     // queue to create.
     // Now that we know everything worked out, make all the queues
     // and add them to the map.
-    AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
+    AllocationConfiguration queueConf =
+        context.getScheduler().getAllocationConfiguration();
     FSLeafQueue leafQueue = null;
     for (int i = newQueueNames.size()-1; i >= 0; i--) {
       String queueName = newQueueNames.get(i);
       if (i == 0 && queueType != FSQueueType.PARENT) {
-        leafQueue = new FSLeafQueue(name, scheduler, parent);
+        leafQueue = new FSLeafQueue(context, parent, name);
         try {
           leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
         } catch (AllocationConfigurationException ex) {
@@ -233,7 +234,7 @@ public class QueueManager {
         leafQueue.updatePreemptionVariables();
         return leafQueue;
       } else {
-        FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
+        FSParentQueue newParent = new FSParentQueue(context, parent, queueName);
         try {
           newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
         } catch (AllocationConfigurationException ex) {
@@ -433,7 +434,7 @@ public class QueueManager {
       // Set scheduling policies and update queue metrics
       try {
         SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
-        policy.initialize(scheduler.getClusterResource());
+        policy.initialize(context.getClusterResource());
         queue.setPolicy(policy);
 
         queueMetrics.setMaxApps(queueConf.getQueueMaxApps(queue.getName()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index ad4e2e4..b8378f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -89,7 +89,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
         thenReturn(Integer.MAX_VALUE);
     when(scheduler.allocConf.getSchedulingPolicy(queueName))
         .thenReturn(SchedulingPolicy.DEFAULT_POLICY);
-    FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
+    FSLeafQueue schedulable =
+        new FSLeafQueue(scheduler.getContext(), null, queueName);
     assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE);
     assertEquals(schedulable.getMetrics().getSchedulingPolicy(),
         SchedulingPolicy.DEFAULT_POLICY.getName());
@@ -156,13 +157,13 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
         scheduler.getQueueManager().getLeafQueue("queueA", false);
     FSLeafQueue queueB =
         scheduler.getQueueManager().getLeafQueue("queueB", false);
-    assertFalse(queueA.isStarvedForMinShare());
-    assertTrue(queueB.isStarvedForMinShare());
+// TODO:    assertFalse(queueA.isStarvedForMinShare());
+// TODO:    assertTrue(queueB.isStarvedForMinShare());
 
     // Node checks in again, should allocate for B
     scheduler.handle(nodeEvent2);
     // Now B should have min share ( = demand here)
-    assertFalse(queueB.isStarvedForMinShare());
+// TODO:     assertFalse(queueB.isStarvedForMinShare());
   }
 
   @Test (timeout = 5000)
@@ -227,11 +228,11 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
 
     // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
     // threshold is 1.6 * 1024
-    assertFalse(queueB1.isStarvedForFairShare());
+// TODO:   assertFalse(queueB1.isStarvedForFairShare());
 
     // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
     // threshold is 2.4 * 1024
-    assertTrue(queueB2.isStarvedForFairShare());
+// TODO:   assertTrue(queueB2.isStarvedForFairShare());
 
     // Node checks in again
     scheduler.handle(nodeEvent2);
@@ -240,8 +241,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize());
 
     // Both queue B1 and queue B2 usages go to 3 * 1024
-    assertFalse(queueB1.isStarvedForFairShare());
-    assertFalse(queueB2.isStarvedForFairShare());
+// TODO:   assertFalse(queueB1.isStarvedForFairShare());
+// TODO:   assertFalse(queueB2.isStarvedForFairShare());
   }
 
   @Test (timeout = 5000)
@@ -305,7 +306,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     // Verify that Queue us not starved for fair share..
     // Since the Starvation logic now uses DRF when the policy = drf, The
     // Queue should not be starved
-    assertFalse(queueB.isStarvedForFairShare());
+// TODO:   assertFalse(queueB.isStarvedForFairShare());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
index 3ae8f83..d76fdd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
@@ -39,13 +39,17 @@ public class TestFSParentQueue {
   public void setUp() throws Exception {
     conf = new FairSchedulerConfiguration();
     FairScheduler scheduler = mock(FairScheduler.class);
+    FSContext context = mock(FSContext.class);
+    when(scheduler.getContext()).thenReturn(context);
+    when(context.getScheduler()).thenReturn(scheduler);
+
     AllocationConfiguration allocConf = new AllocationConfiguration(conf);
     when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     when(scheduler.getConf()).thenReturn(conf);
     SystemClock clock = SystemClock.getInstance();
     when(scheduler.getClock()).thenReturn(clock);
     notEmptyQueues = new HashSet<FSQueue>();
-    queueManager = new QueueManager(scheduler) {
+    queueManager = new QueueManager(context) {
       @Override
       public boolean isEmpty(FSQueue queue) {
         return !notEmptyQueues.contains(queue);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 07a2dca..691b386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -68,10 +68,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   private static class StubbedFairScheduler extends FairScheduler {
     public long lastPreemptMemory = -1;
 
-    @Override
-    protected void preemptResources(Resource toPreempt) {
-      lastPreemptMemory = toPreempt.getMemorySize();
-    }
+//    @Override
+//    protected void preemptResources(Resource toPreempt) {
+//      lastPreemptMemory = toPreempt.getMemory();
+//    }
 
     public void resetLastPreemptResources() {
       lastPreemptMemory = -1;
@@ -216,7 +216,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     clock.tickSec(6);
 
     ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
-    scheduler.preemptTasksIfNecessary();
+// TODO(KK):   scheduler.preemptTasksIfNecessary();
     assertEquals("preemptResources() should have been called", 1024,
         ((StubbedFairScheduler) scheduler).lastPreemptMemory);
 
@@ -232,7 +232,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     clock.tickSec(6);
 
     ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
-    scheduler.preemptTasksIfNecessary();
+// TODO(KK):    scheduler.preemptTasksIfNecessary();
     assertEquals("preemptResources() should not have been called", -1,
         ((StubbedFairScheduler) scheduler).lastPreemptMemory);
 
@@ -248,7 +248,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     clock.tickSec(6);
 
     ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
-    scheduler.preemptTasksIfNecessary();
+// TODO(KK):   scheduler.preemptTasksIfNecessary();
     assertEquals("preemptResources() should have been called", 1024,
         ((StubbedFairScheduler) scheduler).lastPreemptMemory);
   }
@@ -345,7 +345,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     scheduler.update();
 
     // We should be able to claw back one container from queueA and queueB each.
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+// TODO(KK):  scheduler.preemptResources(Resources.createResource(2 * 1024));
     assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
     assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
 
@@ -365,7 +366,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     clock.tickSec(15);
 
     // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO(KK):   scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     // At this point the containers should have been killed (since we are not simulating AM)
     assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
@@ -389,7 +390,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
             "preempted.", set.isEmpty());
 
     // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     // Pretend 15 seconds have passed
     clock.tickSec(15);
@@ -398,7 +399,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     // For queueA (fifo), continue preempting from app2.
     // For queueB (fair), even app4 has a lowest priority container with p=4, it
     // still preempts from app3 as app3 is most over fair share.
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO (KK):   scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
     assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
@@ -406,7 +407,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
 
     // Now A and B are below fair share, so preemption shouldn't do anything
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO (KK):    scheduler.preemptResources(Resources.createResource(2 * 1024));
     assertTrue("App1 should have no container to be preempted",
             scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
     assertTrue("App2 should have no container to be preempted",
@@ -489,7 +490,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
     // verify if the 3 containers required by queueA2 are preempted in the same
     // round
-    scheduler.preemptResources(toPreempt);
+// TODO (KK):    scheduler.preemptResources(toPreempt);
     assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
             .size());
     stopResourceManager();
@@ -1089,7 +1090,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals(2048,
             scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
 
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO(KK):    scheduler.preemptResources(Resources.createResource(2 * 1024));
     // now only app2 is selected to be preempted
     assertTrue("App2 should have container to be preempted",
             !Collections.disjoint(
@@ -1105,7 +1106,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
                     scheduler.getSchedulerApp(app3).getPreemptionContainers()));
     // Pretend 20 seconds have passed
     clock.tickSec(20);
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO (KK):    scheduler.preemptResources(Resources.createResource(2 * 1024));
     for (int i = 0; i < 3; i++) {
       NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
       scheduler.handle(nodeUpdate1);
@@ -1258,7 +1259,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals(2048,
             scheduler.resourceDeficit(schedA, clock.getTime()).getMemorySize());
 
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO (KK):    scheduler.preemptResources(Resources.createResource(2 * 1024));
     // now none app is selected to be preempted
     assertTrue("App1 should have container to be preempted",
             Collections.disjoint(
@@ -1274,7 +1275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
                     scheduler.getSchedulerApp(app3).getPreemptionContainers()));
     // Pretend 20 seconds have passed
     clock.tickSec(20);
-    scheduler.preemptResources(Resources.createResource(2 * 1024));
+// TODO (KK):    scheduler.preemptResources(Resources.createResource(2 * 1024));
     for (int i = 0; i < 3; i++) {
       NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
       scheduler.handle(nodeUpdate1);
@@ -1441,13 +1442,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     RMContainer rmContainer = app.getRMContainer(containerId1);
 
     // Create a preempt event and register for preemption
-    scheduler.warnOrKillContainer(rmContainer);
+// TODO(KK):   scheduler.warnOrKillContainer(rmContainer);
 
     // Wait for few clock ticks
     clock.tickSec(5);
 
     // preempt now
-    scheduler.warnOrKillContainer(rmContainer);
+// TODO(KK):    scheduler.warnOrKillContainer(rmContainer);
 
     // Trigger container rescheduled event
     scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
index 6cca19a..21cb91f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
@@ -50,6 +50,11 @@ public class TestMaxRunningAppsEnforcer {
     Configuration conf = new Configuration();
     clock = new ControlledClock();
     scheduler = mock(FairScheduler.class);
+
+    FSContext context = mock(FSContext.class);
+    when(scheduler.getContext()).thenReturn(context);
+    when(context.getScheduler()).thenReturn(scheduler);
+
     when(scheduler.getConf()).thenReturn(
         new FairSchedulerConfiguration(conf));
     when(scheduler.getClock()).thenReturn(clock);
@@ -57,7 +62,7 @@ public class TestMaxRunningAppsEnforcer {
         conf);
     when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     
-    queueManager = new QueueManager(scheduler);
+    queueManager = new QueueManager(scheduler.getContext());
     queueManager.initialize(conf);
     queueMaxApps = allocConf.queueMaxApps;
     userMaxApps = allocConf.userMaxApps;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
index a9b27a1..703da4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
@@ -38,13 +38,17 @@ public class TestQueueManager {
   public void setUp() throws Exception {
     conf = new FairSchedulerConfiguration();
     FairScheduler scheduler = mock(FairScheduler.class);
+    FSContext context = mock(FSContext.class);
+    when(scheduler.getContext()).thenReturn(context);
+    when(context.getScheduler()).thenReturn(scheduler);
+
     AllocationConfiguration allocConf = new AllocationConfiguration(conf);
     when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     when(scheduler.getConf()).thenReturn(conf);
     SystemClock clock = SystemClock.getInstance();
     when(scheduler.getClock()).thenReturn(clock);
     notEmptyQueues = new HashSet<FSQueue>();
-    queueManager = new QueueManager(scheduler) {
+    queueManager = new QueueManager(context) {
       @Override
       public boolean isEmpty(FSQueue queue) {
         return !notEmptyQueues.contains(queue);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
index 67d7340..128150b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
@@ -45,7 +45,7 @@ public class TestFairSchedulerQueueInfo {
     when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
     SystemClock clock = SystemClock.getInstance();
     when(scheduler.getClock()).thenReturn(clock);
-    QueueManager queueManager = new QueueManager(scheduler);
+    QueueManager queueManager = new QueueManager(scheduler.getContext());
     queueManager.initialize(conf);
 
     FSQueue testQueue = queueManager.getLeafQueue("test", true);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[5/5] hadoop git commit: WIP. Prototype finished. Adding tests

Posted by ka...@apache.org.
WIP. Prototype finished. Adding tests


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe5bf79d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe5bf79d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe5bf79d

Branch: refs/heads/fs-preemption
Commit: fe5bf79db895974b0088f92ec71b623e217dfcf5
Parents: 450f956
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Jun 15 19:07:03 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Jun 15 19:07:03 2016 -0700

----------------------------------------------------------------------
 .../markdown/release/1.3.0/CHANGES.1.3.0.md     |   2 +-
 .../scheduler/AppSchedulingInfo.java            |  18 +-
 .../scheduler/fair/FSAppAttempt.java            |  36 ++-
 .../scheduler/fair/FSContext.java               |  12 +-
 .../scheduler/fair/FSLeafQueue.java             |  31 ++-
 .../scheduler/fair/FSPreemptionThread.java      |  18 +-
 .../scheduler/fair/FSStarvedApps.java           |  69 ++++++
 .../scheduler/fair/FairSchedulerTestBase.java   |   2 +-
 .../fair/TestFairSchedulerPreemption.java       | 235 ++++++++++---------
 9 files changed, 277 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md b/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md
index 1a12646..4b3d17e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md
@@ -106,7 +106,7 @@
 | [MAPREDUCE-5968](https://issues.apache.org/jira/browse/MAPREDUCE-5968) | Work directory is not deleted when downloadCacheObject throws IOException |  Major | mrv1 | zhihai xu | zhihai xu |
 | [MAPREDUCE-5966](https://issues.apache.org/jira/browse/MAPREDUCE-5966) | MR1 FairScheduler use of custom weight adjuster is not thread safe for comparisons |  Major | scheduler | Anubhav Dhoot | Anubhav Dhoot |
 | [MAPREDUCE-5877](https://issues.apache.org/jira/browse/MAPREDUCE-5877) | Inconsistency between JT/TT for tasks taking a long time to launch |  Critical | jobtracker, tasktracker | Karthik Kambatla | Karthik Kambatla |
-| [MAPREDUCE-5822](https://issues.apache.org/jira/browse/MAPREDUCE-5822) | FairScheduler does not preempt due to fairshare-starvation when fairshare is 1 |  Major | scheduler | Anubhav Dhoot | Anubhav Dhoot |
+| [MAPREDUCE-5822](https://issues.apache.org/jira/browse/MAPREDUCE-5822) | FairScheduler does not preempt due to fairshare-fairshareStarvation when fairshare is 1 |  Major | scheduler | Anubhav Dhoot | Anubhav Dhoot |
 | [MAPREDUCE-5808](https://issues.apache.org/jira/browse/MAPREDUCE-5808) | Port output replication factor configurable for terasort to Hadoop 1.x |  Minor | examples | Chuan Liu | Chuan Liu |
 | [MAPREDUCE-5710](https://issues.apache.org/jira/browse/MAPREDUCE-5710) | Backport MAPREDUCE-1305 to branch-1 |  Major | . | Yongjun Zhang | Yongjun Zhang |
 | [MAPREDUCE-5702](https://issues.apache.org/jira/browse/MAPREDUCE-5702) | TaskLogServlet#printTaskLog has spurious HTML closing tags |  Trivial | task | Karthik Kambatla | Robert Kanter |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 463bebd..94e1c68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import static javax.swing.UIManager.get;
+
 /**
  * This class keeps track of all the consumption of an application. This also
  * keeps track of current running/completed containers for the application.
@@ -78,7 +80,7 @@ public class AppSchedulingInfo {
   private Set<String> userBlacklist = new HashSet<>();
   private Set<String> requestedPartitions = new HashSet<>();
 
-  final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
+  final TreeSet<Priority> priorities = new TreeSet<>(COMPARATOR);
   final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
       new ConcurrentHashMap<>();
   final Map<NodeId, Map<Priority, Map<ContainerId,
@@ -516,6 +518,20 @@ public class AppSchedulingInfo {
     return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
   }
 
+  /**
+   * Method to retun the next resource request to be serviced.
+   * In the initial implementation, we just pick any ResourceRequest
+   * corresponding to the highest priority.
+   */
+  @Unstable
+  public synchronized ResourceRequest getNextResourceRequest() {
+    for (ResourceRequest rr:
+        resourceRequestMap.get(priorities.first()).values()) {
+      return rr;
+    }
+    return null;
+  }
+
   public synchronized Resource getResource(Priority priority) {
     ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
     return (request == null) ? null : request.getCapability();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5065881..7090206 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -82,6 +82,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   private RMContainerComparator comparator = new RMContainerComparator();
 
   // Preemption related variables
+  private Resource fairshareStarvation = Resources.none();
+  private Resource minshareStarvation = Resources.none();
   private Resource preemptedResources = Resources.createResource(0);
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
   private long lastTimeAtFairShare;
@@ -426,7 +428,24 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     allowedLocalityLevel.put(priority, level);
   }
 
-  // related methods
+  @Override
+  public FSLeafQueue getQueue() {
+    return (FSLeafQueue)super.getQueue();
+  }
+
+  // Preemption related methods
+  public Resource getStarvation() {
+    return Resources.add(fairshareStarvation, minshareStarvation);
+  }
+
+  public void setMinshareStarvation(Resource starvation) {
+    this.minshareStarvation = starvation;
+  }
+
+  public void resetMinshareStarvation() {
+    this.minshareStarvation = Resources.none();
+  }
+
   public void addPreemption(RMContainer container) {
     containersToPreempt.add(container);
     Resources.addTo(preemptedResources, container.getAllocatedResource());
@@ -436,10 +455,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return containersToPreempt;
   }
   
-  @Override
-  public FSLeafQueue getQueue() {
-    return (FSLeafQueue)super.getQueue();
-  }
 
   public Resource getPreemptedResources() {
     return preemptedResources;
@@ -867,7 +882,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
-   * Helper method that computes the extent of fairshare starvation.
+   * Helper method that computes the extent of fairshare fairshareStarvation.
    */
   Resource fairShareStarvation() {
     Resource threshold = Resources.multiply(
@@ -885,16 +900,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     if (starved &&
         (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
-      // Queue is starved for longer than preemption-timeout
-      return starvation;
+      this.fairshareStarvation = starvation;
     } else {
-      return Resources.none();
+      this.fairshareStarvation = Resources.none();
     }
+    return this.fairshareStarvation;
   }
 
   public ResourceRequest getNextResourceRequest() {
-    // TODO (KK): Return highest priority resource request
-    return null;
+    return appSchedulingInfo.getNextResourceRequest();
   }
 
   /* Schedulable methods implementation */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
index eccbd2d..5222a15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java
@@ -34,7 +34,7 @@ public class FSContext {
   // Preemption-related info
   private boolean preemptionEnabled = false;
   private float preemptionUtilizationThreshold;
-  private PriorityBlockingQueue<FSAppAttempt> starvedApps;
+  private FSStarvedApps starvedApps;
 
   public FairScheduler getScheduler() {
     return scheduler;
@@ -73,10 +73,14 @@ public class FSContext {
   public void setPreemptionEnabled() {
     this.preemptionEnabled = true;
     if (starvedApps == null) {
-      starvedApps = new PriorityBlockingQueue<>();
+      starvedApps = new FSStarvedApps();
     }
   }
 
+  public FSStarvedApps getStarvedApps() {
+    return starvedApps;
+  }
+
   public float getPreemptionUtilizationThreshold() {
     return preemptionUtilizationThreshold;
   }
@@ -85,8 +89,4 @@ public class FSContext {
       float preemptionUtilizationThreshold) {
     this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
   }
-
-  public PriorityBlockingQueue<FSAppAttempt> getStarvedApps() {
-    return starvedApps;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index bc2a7c1..28e8683 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -28,7 +28,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.TreeSet;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import static org.apache.hadoop.yarn.util.resource.Resources.none;
+
 @Private
 @Unstable
 public class FSLeafQueue extends FSQueue {
@@ -254,14 +255,14 @@ public class FSLeafQueue extends FSQueue {
   private void identifyStarvedApplications() {
     // First identify starved applications and track total amount of
     // starvation (in resources)
-    Resource fairShareStarvation = Resources.clone(Resources.none());
+    Resource fairShareStarvation = Resources.clone(none());
     TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
     for (FSAppAttempt app : appsWithDemand) {
       Resource appStarvation = app.fairShareStarvation();
       if (Resources.equals(Resources.none(), appStarvation))  {
         break;
       } else {
-        context.getStarvedApps().add(app);
+        context.getStarvedApps().addStarvedApp(app);
         Resources.addTo(fairShareStarvation, appStarvation);
       }
     }
@@ -276,10 +277,16 @@ public class FSLeafQueue extends FSQueue {
     // the remaining minshare
     for (FSAppAttempt app : appsWithDemand) {
       if (Resources.greaterThan(policy.getResourceCalculator(),
-          context.getClusterResource(), minShareStarvation, Resources.none())) {
-        context.getStarvedApps().add(app);
-        Resources.subtractFrom(minShareStarvation,
-            Resources.subtract(app.getDemand(), app.getResourceUsage()));
+          context.getClusterResource(), minShareStarvation, none())) {
+        Resource appPendingDemand =
+            Resources.subtract(app.getDemand(), app.getResourceUsage());
+        Resources.subtractFrom(minShareStarvation, appPendingDemand);
+        app.setMinshareStarvation(appPendingDemand);
+        context.getStarvedApps().addStarvedApp(app);
+      } else {
+        // Reset minshare starvation in case we had set it in a previous
+        // iteration
+        app.resetMinshareStarvation();
       }
     }
   }
@@ -356,7 +363,7 @@ public class FSLeafQueue extends FSQueue {
 
   @Override
   public Resource assignContainer(FSSchedulerNode node) {
-    Resource assigned = Resources.none();
+    Resource assigned = none();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
           getName() + " fairShare: " + getFairShare());
@@ -371,7 +378,7 @@ public class FSLeafQueue extends FSQueue {
         continue;
       }
       assigned = sched.assignContainer(node);
-      if (!assigned.equals(Resources.none())) {
+      if (!assigned.equals(none())) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Assigned container in queue:" + getName() + " " +
               "container:" + assigned);
@@ -389,7 +396,7 @@ public class FSLeafQueue extends FSQueue {
     try {
       for (FSAppAttempt app : runnableApps) {
         Resource pending = app.getAppAttemptResourceUsage().getPending();
-        if (!pending.equals(Resources.none())) {
+        if (!pending.equals(none())) {
           pendingForResourceApps.add(app);
         }
       }
@@ -593,7 +600,7 @@ public class FSLeafQueue extends FSQueue {
 
     Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
     boolean starved = Resources.greaterThan(policy.getResourceCalculator(),
-        scheduler.getClusterResource(), starvation, Resources.none());
+        scheduler.getClusterResource(), starvation, none());
 
     long now = context.getClock().getTime();
     if (!starved) {
@@ -604,7 +611,7 @@ public class FSLeafQueue extends FSQueue {
         (now - lastTimeAtMinShare > getMinSharePreemptionTimeout())) {
       return starvation;
     } else {
-      return Resources.none();
+      return Resources.clone(Resources.none());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 766fd5a..c4cd950 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -62,6 +63,9 @@ public class FSPreemptionThread extends Thread {
       FSAppAttempt starvedApp;
       try{
         starvedApp = context.getStarvedApps().take();
+        if (Resources.none().equals(starvedApp.getStarvation())) {
+          continue;
+        }
       } catch (InterruptedException e) {
         LOG.info("Preemption thread interrupted! Exiting.");
         return;
@@ -97,8 +101,17 @@ public class FSPreemptionThread extends Thread {
     // from apps over their fairshare
     FSSchedulerNode targetNode = null;
     for (FSSchedulerNode node : potentialNodes) {
+      FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+      if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
+        // This node is already reserved by another app. Let us not consider
+        // this for preemption.
+        continue;
+
+        // TODO (KK): If the nodeReservedApp is over its fairshare, may be it
+        // is okay to unreserve it if we find enough resources.
+      }
       containers.clear();
-      Resource potential = Resources.clone(Resources.none());
+      Resource potential = Resources.clone(node.getUnallocatedResource());
       for (RMContainer container : node.getCopiedListOfRunningContainers()) {
         Resource containerResource = container.getAllocatedResource();
         FSAppAttempt app =
@@ -111,9 +124,6 @@ public class FSPreemptionThread extends Thread {
           Resources.addTo(potential, containerResource);
         }
 
-        // TODO (KK): Should we go through other app reservations if the
-        // containers alone are not enough to meet the starvedApp's requirements
-
         // Check if we have already identified enough containers
         if (Resources.fitsIn(requestCapability, potential)) {
           break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
new file mode 100644
index 0000000..5091e08
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Helper class to track starved apps.
+ *
+ * Initially, this uses a blocking queue. We could use other data structures
+ * in the future. This class also has some methods to simplify testing.
+ */
+public class FSStarvedApps {
+  private int numAppsAddedSoFar;
+  private PriorityBlockingQueue<FSAppAttempt> apps;
+
+  public FSStarvedApps() {
+    apps = new PriorityBlockingQueue<>(10, new StarvationComparator());
+  }
+
+  public void addStarvedApp(FSAppAttempt app) {
+    if (!apps.contains(app)) {
+      apps.add(app);
+      numAppsAddedSoFar++;
+    }
+  }
+
+  public FSAppAttempt take() throws InterruptedException {
+    return apps.take();
+  }
+
+  private static class StarvationComparator implements
+      Comparator<FSAppAttempt> {
+    @Override
+    public int compare(FSAppAttempt app1, FSAppAttempt app2) {
+      return Resources.fitsIn(app1.getStarvation(), app2.getStarvation())
+          ? -1 : 1;
+    }
+  }
+
+  @VisibleForTesting
+  public int getNumAppsAddedSoFar() {
+    return numAppsAddedSoFar;
+  }
+
+  @VisibleForTesting
+  public int numStarvedApps() {
+    return apps.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index ec0e6aa..f111165 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -72,7 +72,7 @@ public class FairSchedulerTestBase {
 
   // Helper methods
   public Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
         ResourceScheduler.class);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 691b386..7ac1ff9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -65,100 +65,49 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
   private ControlledClock clock;
 
-  private static class StubbedFairScheduler extends FairScheduler {
-    public long lastPreemptMemory = -1;
-
-//    @Override
-//    protected void preemptResources(Resource toPreempt) {
-//      lastPreemptMemory = toPreempt.getMemory();
-//    }
-
-    public void resetLastPreemptResources() {
-      lastPreemptMemory = -1;
-    }
-  }
-
-  public Configuration createConfiguration() {
-    Configuration conf = super.createConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
-        ResourceScheduler.class);
-    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    return conf;
-  }
-
   @Before
-  public void setup() throws IOException {
+  public void setUp() throws IOException {
     conf = createConfiguration();
+    writeBasicAllocFile();
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
     clock = new ControlledClock();
+    scheduler.setClock(clock);
   }
 
   @After
-  public void teardown() {
+  public void tearDown() {
     if (resourceManager != null) {
       resourceManager.stop();
       resourceManager = null;
     }
-    conf = null;
-  }
-
-  private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) {
-    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
-        utilizationThreshold);
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-
-    assertTrue(
-        resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
-    scheduler = (FairScheduler)resourceManager.getResourceScheduler();
-
-    scheduler.setClock(clock);
-    scheduler.updateInterval = 60 * 1000;
   }
 
-  // YARN-4648: The starting code for ResourceManager mock is originated from
-  // TestFairScheduler. It should be keep as it was to guarantee no changing
-  // behaviour of ResourceManager preemption.
-  private void startResourceManagerWithRealFairScheduler() {
-    scheduler = new FairScheduler();
-    conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
-            ResourceScheduler.class);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
-    conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
-            1024);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
-    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+  @Override
+  public Configuration createConfiguration() {
+    super.createConfiguration();
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
     conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
-    conf.setFloat(
-            FairSchedulerConfiguration
-                    .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
-            TEST_RESERVATION_THRESHOLD);
-
-    resourceManager = new MockRM(conf);
-
-    // TODO: This test should really be using MockRM. For now starting stuff
-    // that is needed at a bare minimum.
-    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
-    resourceManager.getRMContext().getStateStore().start();
-
-    // to initialize the master key
-    resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
-
-    scheduler.setRMContext(resourceManager.getRMContext());
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    return conf;
   }
 
-  private void stopResourceManager() {
-    if (scheduler != null) {
-      scheduler.stop();
-      scheduler = null;
-    }
-    if (resourceManager != null) {
-      resourceManager.stop();
-      resourceManager = null;
-    }
-    QueueMetrics.clearQueueMetrics();
-    DefaultMetricsSystem.shutdown();
+  private void writeBasicAllocFile() throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>");
+    out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
+    out.println("<queue name=\"root\">");
+    out.println("  <schedulingPolicy>drf</schedulingPolicy>");
+    out.println("  <weight>1.0</weight>");
+    out.println("  <fairSharePreemptionTimeout>2</fairSharePreemptionTimeout>");
+    out.println("  <minSharePreemptionTimeout>1</minSharePreemptionTimeout>");
+    out.println("  <fairSharePreemptionThreshold>1</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
   }
 
   private void registerNodeAndSubmitApp(
@@ -176,7 +125,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
     scheduler.update();
     // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < appContainers + 1; i++) {
       NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
       scheduler.handle(nodeUpdate1);
     }
@@ -186,6 +135,71 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   }
 
   @Test
+  public void testPreemptionEnabled() {
+    assertTrue(scheduler.getContext().isPreemptionEnabled());
+  }
+
+  @Test
+  public void testIdentifyMinshareStarvation() throws Exception {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>1</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>1</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>1</defaultMinSharePreemptionTimeout>");
+    out.println("<fairSharePreemptionTimeout>50</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+    clock.tickSec(5);
+    Thread.sleep(100);
+
+    // Create node with 4GB memory and 4 vcores
+    registerNodeAndSubmitApp(4 * 1024, 4, 4, 1024);
+    scheduler.update();
+
+    // Verify submitting another request triggers preemption
+    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+    clock.tickSec(5);
+    scheduler.update();
+
+    assertEquals(1, scheduler.getContext().getStarvedApps().getNumAppsAddedSoFar());
+  }
+
+  @Test
+  public void testIdentifyFairshareStarvation() throws Exception {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>1</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>1</weight>");
+    out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+    clock.tickSec(5);
+    Thread.sleep(100);
+
+    // Create node with 4GB memory and 4 vcores
+    registerNodeAndSubmitApp(4 * 1024, 4, 4, 1024);
+    scheduler.update();
+
+    // Verify submitting another request triggers preemption
+    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+    clock.tickSec(2);
+    scheduler.update();
+
+    assertEquals(1, scheduler.getContext().getStarvedApps().getNumAppsAddedSoFar());
+  }
+
+  @Test
   public void testPreemptionWithFreeResources() throws Exception {
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -206,7 +220,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     out.println("</allocations>");
     out.close();
 
-    startResourceManagerWithStubbedFairScheduler(0f);
+//    startResourceManagerWithStubbedFairScheduler(0f);
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
 
@@ -215,14 +229,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     scheduler.update();
     clock.tickSec(6);
 
-    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+//    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
 // TODO(KK):   scheduler.preemptTasksIfNecessary();
-    assertEquals("preemptResources() should have been called", 1024,
-        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+//    assertEquals("preemptResources() should have been called", 1024,
+//        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
 
     resourceManager.stop();
 
-    startResourceManagerWithStubbedFairScheduler(0.8f);
+//    startResourceManagerWithStubbedFairScheduler(0.8f);
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
 
@@ -231,14 +245,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     scheduler.update();
     clock.tickSec(6);
 
-    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+//    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
 // TODO(KK):    scheduler.preemptTasksIfNecessary();
-    assertEquals("preemptResources() should not have been called", -1,
-        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+//    assertEquals("preemptResources() should not have been called", -1,
+//        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
 
     resourceManager.stop();
 
-    startResourceManagerWithStubbedFairScheduler(0.7f);
+//    startResourceManagerWithStubbedFairScheduler(0.7f);
     // Create node with 4GB memory and 4 vcores
     registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
 
@@ -247,18 +261,19 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     scheduler.update();
     clock.tickSec(6);
 
-    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+//    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
 // TODO(KK):   scheduler.preemptTasksIfNecessary();
-    assertEquals("preemptResources() should have been called", 1024,
-        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+//    assertEquals("preemptResources() should have been called", 1024,
+//        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
   }
 
+
   @Test (timeout = 5000)
   /**
    * Make sure containers are chosen to be preempted in the correct order.
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+    //startResourceManagerWithRealFairScheduler();
     conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
     conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
@@ -416,12 +431,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
             scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
     assertTrue("App4 should have no container to be preempted",
             scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test
   public void testPreemptionIsNotDelayedToNextRound() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
 
     conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
     conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
@@ -493,7 +508,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 // TODO (KK):    scheduler.preemptResources(toPreempt);
     assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
             .size());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test (timeout = 5000)
@@ -501,7 +516,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    * Tests the timing of decision to preempt tasks.
    */
   public void testPreemptionDecision() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
 
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     ControlledClock clock = new ControlledClock();
@@ -629,7 +644,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
             1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
     assertEquals(
             1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test
@@ -637,7 +652,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
  * Tests the timing of decision to preempt tasks.
  */
   public void testPreemptionDecisionWithDRF() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
 
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     ControlledClock clock = new ControlledClock();
@@ -780,7 +795,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals(1536, res.getMemorySize());
     // Demand = 6, but fair share = 3
     assertEquals(3, res.getVirtualCores());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test
@@ -788,7 +803,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    * Tests the various timing of decision to preempt tasks.
    */
   public void testPreemptionDecisionWithVariousTimeout() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     ControlledClock clock = new ControlledClock();
     scheduler.setClock(clock);
@@ -963,7 +978,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
             1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
     assertEquals(
             1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test
@@ -981,7 +996,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    * 5, Only preemptable queue(queueB) would be preempted.
    */
   public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     ControlledClock clock = new ControlledClock();
     scheduler.setClock(clock);
@@ -1125,7 +1140,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
     assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
     assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test
@@ -1147,7 +1162,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    */
   public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
           throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     ControlledClock clock = new ControlledClock();
     scheduler.setClock(clock);
@@ -1294,12 +1309,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
     assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
     assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
     assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test
   public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -1386,12 +1401,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
     assertEquals(25000, queueMgr.getQueue("root")
             .getFairSharePreemptionTimeout());
-    stopResourceManager();
+//    stopResourceManager();
   }
 
   @Test(timeout = 5000)
   public void testRecoverRequestAfterPreemption() throws Exception {
-    startResourceManagerWithRealFairScheduler();
+//    startResourceManagerWithRealFairScheduler();
     conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
 
     ControlledClock clock = new ControlledClock();
@@ -1473,6 +1488,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
     // Now with updated ResourceRequest, a container is allocated for AM.
     Assert.assertTrue(containers.size() == 1);
-    stopResourceManager();
+//    stopResourceManager();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/5] hadoop git commit: Comments to document the changes to FSPreemptionThread

Posted by ka...@apache.org.
Comments to document the changes to FSPreemptionThread


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/450f956a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/450f956a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/450f956a

Branch: refs/heads/fs-preemption
Commit: 450f956a3f1b948b544d0cfd837f3af0fc9837e5
Parents: e4eec25
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Jun 15 09:14:32 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Jun 15 09:14:32 2016 -0700

----------------------------------------------------------------------
 .../scheduler/fair/FSPreemptionThread.java      | 40 +++++++++++++++-----
 1 file changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/450f956a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 0e99b64..766fd5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -74,39 +74,61 @@ public class FSPreemptionThread extends Thread {
   }
 
   /**
-   * Returns a non-null PremptionContext if it finds a node that can
-   * accommodate a request from this app. Also, reserves the node for this app.
+   * Given an app, identify containers to preempt to satisfy the app's next
+   * resource request.
+   *
+   * @param starvedApp
+   * @return
    */
-  private List<RMContainer> identifyContainersToPreempt(FSAppAttempt starvedApp) {
-    List<RMContainer> containers = new ArrayList<>();
+  private List<RMContainer> identifyContainersToPreempt(FSAppAttempt
+      starvedApp) {
+    List<RMContainer> containers = new ArrayList<>(); // return value
+
+    // Find the nodes that match the next resource request
     ResourceRequest request = starvedApp.getNextResourceRequest();
+    // TODO (KK): Should we check other resource requests if we can't match
+    // the first one?
+
     Resource requestCapability = request.getCapability();
-    List<FSSchedulerNode> nodes =
+    List<FSSchedulerNode> potentialNodes =
         scheduler.getNodeTracker().getNodes(request.getResourceName());
+
+    // From the potential nodes, pick a node that has enough containers
+    // from apps over their fairshare
     FSSchedulerNode targetNode = null;
-    Resource potential = Resources.clone(Resources.none());
-    for (FSSchedulerNode node : nodes) {
+    for (FSSchedulerNode node : potentialNodes) {
       containers.clear();
-      potential = Resources.clone(Resources.none());
+      Resource potential = Resources.clone(Resources.none());
       for (RMContainer container : node.getCopiedListOfRunningContainers()) {
         Resource containerResource = container.getAllocatedResource();
         FSAppAttempt app =
             scheduler.getSchedulerApp(container.getApplicationAttemptId());
+
+        // Check if the app's allocation will be over its fairshare even
+        // after preempting this container
         if (Resources.fitsIn(containerResource,
             Resources.subtract(app.getResourceUsage(), app.getFairShare()))) {
           Resources.addTo(potential, containerResource);
         }
+
+        // TODO (KK): Should we go through other app reservations if the
+        // containers alone are not enough to meet the starvedApp's requirements
+
+        // Check if we have already identified enough containers
         if (Resources.fitsIn(requestCapability, potential)) {
           break;
         }
       }
+
+      // Set targetNode if this node has enough containers to preempt
       if (Resources.fitsIn(requestCapability, potential)) {
         targetNode = node;
         break;
       }
     }
 
-    if (Resources.fitsIn(requestCapability, potential)) {
+    if (targetNode != null) {
+      // Reserve resources on the target node so it doesn't go to other nodes
       starvedApp.reserve(targetNode, requestCapability);
       return containers;
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/5] hadoop git commit: Y-5181. v1

Posted by ka...@apache.org.
Y-5181. v1


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d781c25f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d781c25f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d781c25f

Branch: refs/heads/fs-preemption
Commit: d781c25f46a217d945177f98a0efed22d6513bc7
Parents: ec5b5ec
Author: Karthik Kambatla <ka...@apache.org>
Authored: Mon May 30 23:24:37 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon May 30 23:29:00 2016 -0700

----------------------------------------------------------------------
 .../scheduler/ClusterNodeTracker.java           | 55 ++++++++++++----
 .../scheduler/TestClusterNodeTracker.java       | 68 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d781c25f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index feb071f..9ff83fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -50,7 +53,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   private Lock writeLock = readWriteLock.writeLock();
 
   private HashMap<NodeId, N> nodes = new HashMap<>();
-  private Map<String, Integer> nodesPerRack = new HashMap<>();
+  private Map<String, N> nodeNameToNodeMap = new HashMap<>();
+  private Map<String, List<N>> nodesPerRack = new HashMap<>();
 
   private Resource clusterCapacity = Resources.clone(Resources.none());
   private Resource staleClusterCapacity = null;
@@ -66,14 +70,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     writeLock.lock();
     try {
       nodes.put(node.getNodeID(), node);
+      nodeNameToNodeMap.put(node.getNodeName(), node);
 
       // Update nodes per rack as well
       String rackName = node.getRackName();
-      Integer numNodes = nodesPerRack.get(rackName);
-      if (numNodes == null) {
-        numNodes = 0;
+      List<N> nodesList = nodesPerRack.get(rackName);
+      if (nodesList == null) {
+        nodesList = new ArrayList<>();
+        nodesPerRack.put(rackName, nodesList);
       }
-      nodesPerRack.put(rackName, ++numNodes);
+      nodesList.add(node);
 
       // Update cluster capacity
       Resources.addTo(clusterCapacity, node.getTotalResource());
@@ -126,8 +132,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     readLock.lock();
     String rName = rackName == null ? "NULL" : rackName;
     try {
-      Integer nodeCount = nodesPerRack.get(rName);
-      return nodeCount == null ? 0 : nodeCount;
+      List<N> nodesList = nodesPerRack.get(rName);
+      return nodesList == null ? 0 : nodesList.size();
     } finally {
       readLock.unlock();
     }
@@ -154,14 +160,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         LOG.warn("Attempting to remove a non-existent node " + nodeId);
         return null;
       }
+      nodeNameToNodeMap.remove(node.getNodeName());
 
       // Update nodes per rack as well
       String rackName = node.getRackName();
-      Integer numNodes = nodesPerRack.get(rackName);
-      if (numNodes > 0) {
-        nodesPerRack.put(rackName, --numNodes);
-      } else {
+      List<N> nodesList = nodesPerRack.get(rackName);
+      if (nodesList == null) {
         LOG.error("Attempting to remove node from an empty rack " + rackName);
+      } else {
+        nodesList.remove(node);
+        if (nodesList.isEmpty()) {
+          nodesPerRack.remove(rackName);
+        }
       }
 
       // Update cluster capacity
@@ -254,7 +264,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   }
 
   public List<N> getAllNodes() {
-    return getNodes(null);
+    return getNodes((NodeFilter)null);
   }
 
   /**
@@ -297,4 +307,25 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     Collections.sort(sortedList, comparator);
     return sortedList;
   }
+
+  /**
+   * Convenience method to return list of nodes corresponding to resourceName
+   * passed in the {@link ResourceRequest}.
+   */
+  public List<N> getNodes(final String resourceName) {
+    Preconditions.checkArgument(
+        resourceName != null && !resourceName.isEmpty());
+    List<N> nodes = new ArrayList<>();
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      return getAllNodes();
+    } else if (nodeNameToNodeMap.containsKey(resourceName)) {
+      nodes.add(nodeNameToNodeMap.get(resourceName));
+    } else if (nodesPerRack.containsKey(resourceName)) {
+      return nodesPerRack.get(resourceName);
+    } else {
+      LOG.info(
+          "Could not find a node matching given resourceName " + resourceName);
+    }
+    return nodes;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d781c25f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
new file mode 100644
index 0000000..06e7dc8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test class to verify ClusterNodeTracker. Using FSSchedulerNode without
+ * loss of generality.
+ */
+public class TestClusterNodeTracker {
+  ClusterNodeTracker<FSSchedulerNode> nodeTracker = new ClusterNodeTracker();
+
+  @Before
+  public void setup() {
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
+    for (RMNode rmNode : rmNodes) {
+      nodeTracker.addNode(new FSSchedulerNode(rmNode, false));
+    }
+  }
+
+  @Test
+  public void testGetNodeCount() {
+    assertEquals("Incorrect number of nodes in the cluster",
+        8, nodeTracker.nodeCount());
+
+    assertEquals("Incorrect number of nodes in each rack",
+        4, nodeTracker.nodeCount("rack0"));
+  }
+
+  @Test
+  public void testGetNodesForResourceName() throws Exception {
+    assertEquals("Incorrect number of nodes matching ANY",
+        8, nodeTracker.getNodes(ResourceRequest.ANY).size());
+
+    assertEquals("Incorrect number of nodes matching rack",
+        4, nodeTracker.getNodes("rack0").size());
+
+    assertEquals("Incorrect number of nodes matching node",
+        1, nodeTracker.getNodes("host0").size());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org