You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/04/12 23:42:34 UTC

[47/50] hadoop git commit: YARN-6432. FairScheduler: Reserve preempted resources for corresponding applications. (Miklos Szegedi via kasha)

YARN-6432. FairScheduler: Reserve preempted resources for corresponding applications. (Miklos Szegedi via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3375175
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3375175
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3375175

Branch: refs/heads/HDFS-7240
Commit: c3375175d616e0380560f89d491b6b9753a8f3e1
Parents: 9d9087a
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Apr 12 14:17:13 2017 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Apr 12 14:21:20 2017 -0700

----------------------------------------------------------------------
 .../rmcontainer/RMContainer.java                |   3 +-
 .../rmcontainer/RMContainerImpl.java            |   2 +-
 .../scheduler/SchedulerNode.java                |   2 +-
 .../scheduler/fair/FSAppAttempt.java            |   8 +-
 .../scheduler/fair/FSPreemptionThread.java      |  25 +-
 .../scheduler/fair/FSSchedulerNode.java         | 133 +++++-
 .../scheduler/fair/FairScheduler.java           |  41 +-
 .../scheduler/fair/TestFSSchedulerNode.java     | 403 +++++++++++++++++++
 .../fair/TestFairSchedulerPreemption.java       |  19 +
 9 files changed, 597 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 7ad381e..29680e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -42,7 +42,8 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
  * when resources are being reserved to fill space for a future container 
  * allocation.
  */
-public interface RMContainer extends EventHandler<RMContainerEvent> {
+public interface RMContainer extends EventHandler<RMContainerEvent>,
+    Comparable<RMContainer> {
 
   ContainerId getContainerId();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 12fbbea..1e9463a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
+public class RMContainerImpl implements RMContainer {
 
   private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index af4a001..272537c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -160,7 +160,7 @@ public abstract class SchedulerNode {
    * @param rmContainer Allocated container
    * @param launchedOnNode True if the container has been launched
    */
-  private synchronized void allocateContainer(RMContainer rmContainer,
+  protected synchronized void allocateContainer(RMContainer rmContainer,
       boolean launchedOnNode) {
     Container container = rmContainer.getContainer();
     if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index e0dfb73..a1c4b4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -647,7 +647,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       Container reservedContainer, NodeType type,
       SchedulerRequestKey schedulerKey) {
 
-    if (!reservationExceedsThreshold(node, type)) {
+    RMContainer nodeReservedContainer = node.getReservedContainer();
+    boolean reservableForThisApp = nodeReservedContainer == null ||
+        nodeReservedContainer.getApplicationAttemptId()
+            .equals(getApplicationAttemptId());
+    if (reservableForThisApp &&!reservationExceedsThreshold(node, type)) {
       LOG.info("Making reservation: node=" + node.getNodeName() +
               " app_id=" + getApplicationId());
       if (reservedContainer == null) {
@@ -1139,7 +1143,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   /**
    * Is application starved for fairshare or minshare
    */
-  private boolean isStarved() {
+  boolean isStarved() {
     return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 65df0c2..efe36a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -113,11 +113,6 @@ class FSPreemptionThread extends Thread {
         List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
             .getNodesByResourceName(rr.getResourceName());
         for (FSSchedulerNode node : potentialNodes) {
-          // TODO (YARN-5829): Attempt to reserve the node for starved app.
-          if (isNodeAlreadyReserved(node, starvedApp)) {
-            continue;
-          }
-
           int maxAMContainers = bestContainers == null ?
               Integer.MAX_VALUE : bestContainers.numAMContainers;
           PreemptableContainers preemptableContainers =
@@ -134,7 +129,8 @@ class FSPreemptionThread extends Thread {
 
         if (bestContainers != null && bestContainers.containers.size() > 0) {
           containersToPreempt.addAll(bestContainers.containers);
-          trackPreemptionsAgainstNode(bestContainers.containers);
+          // Reserve the containers for the starved app
+          trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
         }
       }
     } // End of iteration over RRs
@@ -163,8 +159,10 @@ class FSPreemptionThread extends Thread {
         node.getRunningContainersWithAMsAtTheEnd();
     containersToCheck.removeAll(node.getContainersForPreemption());
 
-    // Initialize potential with unallocated resources
-    Resource potential = Resources.clone(node.getUnallocatedResource());
+    // Initialize potential with unallocated but not reserved resources
+    Resource potential = Resources.subtractFromNonNegative(
+        Resources.clone(node.getUnallocatedResource()),
+        node.getTotalReserved());
 
     for (RMContainer container : containersToCheck) {
       FSAppAttempt app =
@@ -182,8 +180,6 @@ class FSPreemptionThread extends Thread {
       // Check if we have already identified enough containers
       if (Resources.fitsIn(request, potential)) {
         return preemptableContainers;
-      } else {
-        // TODO (YARN-5829): Unreserve the node for the starved app.
       }
     }
     return null;
@@ -195,10 +191,11 @@ class FSPreemptionThread extends Thread {
     return nodeReservedApp != null && !nodeReservedApp.equals(app);
   }
 
-  private void trackPreemptionsAgainstNode(List<RMContainer> containers) {
+  private void trackPreemptionsAgainstNode(List<RMContainer> containers,
+                                           FSAppAttempt app) {
     FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
         .getNode(containers.get(0).getNodeId());
-    node.addContainersForPreemption(containers);
+    node.addContainersForPreemption(containers, app);
   }
 
   private void preemptContainers(List<RMContainer> containers) {
@@ -232,10 +229,6 @@ class FSPreemptionThread extends Thread {
         LOG.info("Killing container " + container);
         scheduler.completedContainer(
             container, status, RMContainerEventType.KILL);
-
-        FSSchedulerNode containerNode = (FSSchedulerNode)
-            scheduler.getNodeTracker().getNode(container.getAllocatedNode());
-        containerNode.removeContainerForPreemption(container);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index d983ea0..663e3c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -18,18 +18,26 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 
@@ -38,15 +46,38 @@ import java.util.concurrent.ConcurrentSkipListSet;
 public class FSSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
-
   private FSAppAttempt reservedAppSchedulable;
-  private final Set<RMContainer> containersForPreemption =
+  // Stores list of containers still to be preempted
+  @VisibleForTesting
+  final Set<RMContainer> containersForPreemption =
       new ConcurrentSkipListSet<>();
+  // Stores amount of resources preempted and reserved for each app
+  @VisibleForTesting
+  final Map<FSAppAttempt, Resource>
+      resourcesPreemptedForApp = new LinkedHashMap<>();
+  private final Map<ApplicationAttemptId, FSAppAttempt> appIdToAppMap =
+      new HashMap<>();
+  // Sum of resourcesPreemptedForApp values, total resources that are
+  // slated for preemption
+  private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
 
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     super(node, usePortForNodeName);
   }
 
+  /**
+   * Total amount of reserved resources including reservations and preempted
+   * containers.
+   * @return total resources reserved
+   */
+  Resource getTotalReserved() {
+    Resource totalReserved = Resources.clone(getReservedContainer() != null
+        ? getReservedContainer().getAllocatedResource()
+        : Resource.newInstance(0, 0));
+    Resources.addTo(totalReserved, totalResourcesPreempted);
+    return totalReserved;
+  }
+
   @Override
   public synchronized void reserveResource(
       SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey,
@@ -110,16 +141,55 @@ public class FSSchedulerNode extends SchedulerNode {
   }
 
   /**
+   * List reserved resources after preemption and assign them to the
+   * appropriate applications in a FIFO order.
+   * @return if any resources were allocated
+   */
+  @VisibleForTesting
+  synchronized LinkedHashMap<FSAppAttempt, Resource> getPreemptionList() {
+    cleanupPreemptionList();
+    return new LinkedHashMap<>(resourcesPreemptedForApp);
+  }
+
+  /**
+   * Remove apps that have their preemption requests fulfilled.
+   */
+  private synchronized void cleanupPreemptionList() {
+    Iterator<FSAppAttempt> iterator =
+        resourcesPreemptedForApp.keySet().iterator();
+    while (iterator.hasNext()) {
+      FSAppAttempt app = iterator.next();
+      if (app.isStopped() || !app.isStarved()) {
+        // App does not need more resources
+        Resources.subtractFrom(totalResourcesPreempted,
+            resourcesPreemptedForApp.get(app));
+        appIdToAppMap.remove(app.getApplicationAttemptId());
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
    * Mark {@code containers} as being considered for preemption so they are
    * not considered again. A call to this requires a corresponding call to
-   * {@link #removeContainerForPreemption} to ensure we do not mark a
-   * container for preemption and never consider it again and avoid memory
-   * leaks.
+   * {@code releaseContainer} to ensure we do not mark a container for
+   * preemption and never consider it again and avoid memory leaks.
    *
    * @param containers container to mark
    */
-  void addContainersForPreemption(Collection<RMContainer> containers) {
-    containersForPreemption.addAll(containers);
+  void addContainersForPreemption(Collection<RMContainer> containers,
+                                  FSAppAttempt app) {
+
+    appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
+    resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0));
+    Resource appReserved = resourcesPreemptedForApp.get(app);
+
+    for(RMContainer container : containers) {
+      containersForPreemption.add(container);
+      Resources.addTo(appReserved, container.getAllocatedResource());
+      Resources.addTo(totalResourcesPreempted,
+          container.getAllocatedResource());
+    }
   }
 
   /**
@@ -130,11 +200,50 @@ public class FSSchedulerNode extends SchedulerNode {
   }
 
   /**
-   * Remove container from the set of containers marked for preemption.
-   *
-   * @param container container to remove
+   * The Scheduler has allocated containers on this node to the given
+   * application.
+   * @param rmContainer Allocated container
+   * @param launchedOnNode True if the container has been launched
+   */
+  @Override
+  protected synchronized void allocateContainer(RMContainer rmContainer,
+                                                boolean launchedOnNode) {
+    super.allocateContainer(rmContainer, launchedOnNode);
+    Resource allocated = rmContainer.getAllocatedResource();
+    if (!Resources.isNone(allocated)) {
+      // check for satisfied preemption request and update bookkeeping
+      FSAppAttempt app =
+          appIdToAppMap.get(rmContainer.getApplicationAttemptId());
+      if (app != null) {
+        Resource reserved = resourcesPreemptedForApp.get(app);
+        Resource fulfilled = Resources.componentwiseMin(reserved, allocated);
+        Resources.subtractFrom(reserved, fulfilled);
+        Resources.subtractFrom(totalResourcesPreempted, fulfilled);
+        if (Resources.isNone(reserved)) {
+          // No more preempted containers
+          resourcesPreemptedForApp.remove(app);
+          appIdToAppMap.remove(rmContainer.getApplicationAttemptId());
+        }
+      }
+    } else {
+      LOG.error("Allocated empty container" + rmContainer.getContainerId());
+    }
+  }
+
+  /**
+   * Release an allocated container on this node.
+   * It also releases from the reservation list to trigger preemption
+   * allocations.
+   * @param containerId ID of container to be released.
+   * @param releasedByNode whether the release originates from a node update.
    */
-  void removeContainerForPreemption(RMContainer container) {
-    containersForPreemption.remove(container);
+  @Override
+  public synchronized void releaseContainer(ContainerId containerId,
+                                            boolean releasedByNode) {
+    RMContainer container = getContainer(containerId);
+    super.releaseContainer(containerId, releasedByNode);
+    if (container != null) {
+      containersForPreemption.remove(container);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 98c14ac..d1a237a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -71,9 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -972,6 +970,31 @@ public class FairScheduler extends
     }
   }
 
+  /**
+   * Assign preempted containers to the applications that have reserved
+   * resources for preempted containers.
+   * @param node Node to check
+   * @return assignment has occurred
+   */
+  static boolean assignPreemptedContainers(FSSchedulerNode node) {
+    boolean assignedAny = false;
+    for (Entry<FSAppAttempt, Resource> entry :
+        node.getPreemptionList().entrySet()) {
+      FSAppAttempt app = entry.getKey();
+      Resource preemptionPending = Resources.clone(entry.getValue());
+      while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
+        Resource assigned = app.assignContainer(node);
+        if (Resources.isNone(assigned)) {
+          // Fail to assign, let's not try further
+          break;
+        }
+        assignedAny = true;
+        Resources.subtractFromNonNegative(preemptionPending, assigned);
+      }
+    }
+    return assignedAny;
+  }
+
   @VisibleForTesting
   void attemptScheduling(FSSchedulerNode node) {
     try {
@@ -991,11 +1014,17 @@ public class FairScheduler extends
       }
 
       // Assign new containers...
-      // 1. Check for reserved applications
-      // 2. Schedule if there are no reservations
-
-      boolean validReservation = false;
+      // 1. Ensure containers are assigned to the apps that preempted
+      // 2. Check for reserved applications
+      // 3. Schedule if there are no reservations
+
+      // Apps may wait for preempted containers
+      // We have to satisfy these first to avoid cases, when we preempt
+      // a container for A from B and C gets the preempted containers,
+      // when C does not qualify for preemption itself.
+      assignPreemptedContainers(node);
       FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+      boolean validReservation = false;
       if (reservedAppSchedulable != null) {
         validReservation = reservedAppSchedulable.assignReservedContainer(node);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
new file mode 100644
index 0000000..3927b00
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test scheduler node, especially preemption reservations.
+ */
+public class TestFSSchedulerNode {
+  private final ArrayList<RMContainer> containers = new ArrayList<>();
+
+  private RMNode createNode() {
+    RMNode node = mock(RMNode.class);
+    when(node.getTotalCapability()).thenReturn(Resource.newInstance(8192, 8));
+    when(node.getHostName()).thenReturn("host.domain.com");
+    return node;
+  }
+
+  private void createDefaultContainer() {
+    createContainer(Resource.newInstance(1024, 1), null);
+  }
+
+  private RMContainer createContainer(
+      Resource request, ApplicationAttemptId appAttemptId) {
+    RMContainer container = mock(RMContainer.class);
+    Container containerInner = mock(Container.class);
+    ContainerId id = mock(ContainerId.class);
+    when(id.getContainerId()).thenReturn((long)containers.size());
+    when(containerInner.getResource()).
+        thenReturn(Resources.clone(request));
+    when(containerInner.getId()).thenReturn(id);
+    when(containerInner.getExecutionType()).
+        thenReturn(ExecutionType.GUARANTEED);
+    when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(container.getContainerId()).thenReturn(id);
+    when(container.getContainer()).thenReturn(containerInner);
+    when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+    when(container.getAllocatedResource()).
+        thenReturn(Resources.clone(request));
+    containers.add(container);
+    return container;
+  }
+
+  private void saturateCluster(FSSchedulerNode schedulerNode) {
+    while (!Resources.isNone(schedulerNode.getUnallocatedResource())) {
+      createDefaultContainer();
+      schedulerNode.allocateContainer(containers.get(containers.size() - 1));
+      schedulerNode.containerStarted(containers.get(containers.size() - 1).
+          getContainerId());
+    }
+  }
+
+  private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode,
+                                         Resource request) {
+    FSAppAttempt starvingApp = mock(FSAppAttempt.class);
+    ApplicationAttemptId appAttemptId =
+        mock(ApplicationAttemptId.class);
+    when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
+        new Answer<Resource>() {
+          @Override
+          public Resource answer(InvocationOnMock invocationOnMock)
+              throws Throwable {
+            Resource response = Resource.newInstance(0, 0);
+            while (!Resources.isNone(request) &&
+                !Resources.isNone(schedulerNode.getUnallocatedResource())) {
+              RMContainer container = createContainer(request, appAttemptId);
+              schedulerNode.allocateContainer(container);
+              Resources.addTo(response, container.getAllocatedResource());
+              Resources.subtractFrom(request,
+                  container.getAllocatedResource());
+            }
+            return response;
+          }
+        });
+    when(starvingApp.isStarved()).thenAnswer(
+        new Answer<Boolean>() {
+          @Override
+          public Boolean answer(InvocationOnMock invocationOnMock)
+              throws Throwable {
+            return !Resources.isNone(request);
+          }
+        }
+    );
+    when(starvingApp.getPendingDemand()).thenReturn(request);
+    return starvingApp;
+  }
+
+  private void finalValidation(FSSchedulerNode schedulerNode) {
+    assertEquals("Everything should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    assertTrue("No containers should be reserved for preemption",
+        schedulerNode.containersForPreemption.isEmpty());
+    assertTrue("No resources should be reserved for preemptors",
+        schedulerNode.resourcesPreemptedForApp.isEmpty());
+    assertEquals(
+        "No amount of resource should be reserved for preemptees",
+        Resources.none(),
+        schedulerNode.getTotalReserved());
+  }
+
+  private void allocateContainers(FSSchedulerNode schedulerNode) {
+    FairScheduler.assignPreemptedContainers(schedulerNode);
+  }
+
+  /**
+   * Allocate and release a single container.
+   */
+  @Test
+  public void testSimpleAllocation() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    createDefaultContainer();
+    assertEquals("Nothing should have been allocated, yet",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    schedulerNode.allocateContainer(containers.get(0));
+    assertEquals("Container should be allocated",
+        containers.get(0).getContainer().getResource(),
+        schedulerNode.getAllocatedResource());
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    assertEquals("Everything should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+
+    // Check that we are error prone
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    finalValidation(schedulerNode);
+  }
+
+  /**
+   * Allocate and release three containers with launch.
+   */
+  @Test
+  public void testMultipleAllocations() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    createDefaultContainer();
+    createDefaultContainer();
+    createDefaultContainer();
+    assertEquals("Nothing should have been allocated, yet",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    schedulerNode.allocateContainer(containers.get(0));
+    schedulerNode.containerStarted(containers.get(0).getContainerId());
+    schedulerNode.allocateContainer(containers.get(1));
+    schedulerNode.containerStarted(containers.get(1).getContainerId());
+    schedulerNode.allocateContainer(containers.get(2));
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(), 3.0),
+        schedulerNode.getAllocatedResource());
+    schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
+    schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    finalValidation(schedulerNode);
+  }
+
+  /**
+   * Allocate and release a single container.
+   */
+  @Test
+  public void testSimplePreemption() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    // Launch containers and saturate the cluster
+    saturateCluster(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(),
+            containers.size()),
+        schedulerNode.getAllocatedResource());
+
+    // Request preemption
+    FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+        Resource.newInstance(1024, 1));
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp);
+    assertEquals(
+        "No resource amount should be reserved for preemptees",
+        containers.get(0).getAllocatedResource(),
+        schedulerNode.getTotalReserved());
+
+    // Preemption occurs release one container
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    allocateContainers(schedulerNode);
+    assertEquals("Container should be allocated",
+        schedulerNode.getTotalResource(),
+        schedulerNode.getAllocatedResource());
+
+    // Release all remaining containers
+    for (int i = 1; i < containers.size(); ++i) {
+      schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+    }
+    finalValidation(schedulerNode);
+  }
+
+  /**
+   * Allocate and release three containers requested by two apps.
+   */
+  @Test
+  public void testComplexPreemption() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    // Launch containers and saturate the cluster
+    saturateCluster(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(),
+            containers.size()),
+        schedulerNode.getAllocatedResource());
+
+    // Preempt a container
+    FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
+        Resource.newInstance(2048, 2));
+    FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
+        Resource.newInstance(1024, 1));
+
+    // Preemption thread kicks in
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp1);
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(1)), starvingApp1);
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(2)), starvingApp2);
+
+    // Preemption happens
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
+    schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
+
+    allocateContainers(schedulerNode);
+    assertEquals("Container should be allocated",
+        schedulerNode.getTotalResource(),
+        schedulerNode.getAllocatedResource());
+
+    // Release all containers
+    for (int i = 3; i < containers.size(); ++i) {
+      schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+    }
+    finalValidation(schedulerNode);
+  }
+
+  /**
+   * Allocate and release three containers requested by two apps in two rounds.
+   */
+  @Test
+  public void testMultiplePreemptionEvents() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    // Launch containers and saturate the cluster
+    saturateCluster(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(),
+            containers.size()),
+        schedulerNode.getAllocatedResource());
+
+    // Preempt a container
+    FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
+        Resource.newInstance(2048, 2));
+    FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
+        Resource.newInstance(1024, 1));
+
+    // Preemption thread kicks in
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp1);
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(1)), starvingApp1);
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(2)), starvingApp2);
+
+    // Preemption happens
+    schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
+    allocateContainers(schedulerNode);
+
+    schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    allocateContainers(schedulerNode);
+
+    assertEquals("Container should be allocated",
+        schedulerNode.getTotalResource(),
+        schedulerNode.getAllocatedResource());
+
+    // Release all containers
+    for (int i = 3; i < containers.size(); ++i) {
+      schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+    }
+    finalValidation(schedulerNode);
+  }
+
+  /**
+   * Allocate and release a single container and delete the app in between.
+   */
+  @Test
+  public void testPreemptionToCompletedApp() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    // Launch containers and saturate the cluster
+    saturateCluster(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(),
+            containers.size()),
+        schedulerNode.getAllocatedResource());
+
+    // Preempt a container
+    FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+        Resource.newInstance(1024, 1));
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp);
+
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+
+    // Stop the application then try to satisfy the reservation
+    // and observe that there are still free resources not allocated to
+    // the deleted app
+    when(starvingApp.isStopped()).thenReturn(true);
+    allocateContainers(schedulerNode);
+    assertNotEquals("Container should be allocated",
+        schedulerNode.getTotalResource(),
+        schedulerNode.getAllocatedResource());
+
+    // Release all containers
+    for (int i = 1; i < containers.size(); ++i) {
+      schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+    }
+    finalValidation(schedulerNode);
+  }
+
+  /**
+   * Preempt a bigger container than the preemption request.
+   */
+  @Test
+  public void testPartialReservedPreemption() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    // Launch containers and saturate the cluster
+    saturateCluster(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(),
+            containers.size()),
+        schedulerNode.getAllocatedResource());
+
+    // Preempt a container
+    Resource originalStarvingAppDemand = Resource.newInstance(512, 1);
+    FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+        originalStarvingAppDemand);
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp);
+
+    // Preemption occurs
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+
+    // Container partially reassigned
+    allocateContainers(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.subtract(schedulerNode.getTotalResource(),
+            Resource.newInstance(512, 0)),
+        schedulerNode.getAllocatedResource());
+
+    // Cleanup simulating node update
+    schedulerNode.getPreemptionList();
+
+    // Release all containers
+    for (int i = 1; i < containers.size(); ++i) {
+      schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+    }
+    finalValidation(schedulerNode);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 3940a47..59d243b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -294,11 +294,30 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
         8 - 2 * numStarvedAppContainers,
         greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
 
+    // Verify the node is reserved for the starvingApp
+    for (RMNode rmNode : rmNodes) {
+      FSSchedulerNode node = (FSSchedulerNode)
+          scheduler.getNodeTracker().getNode(rmNode.getNodeID());
+      if (node.getContainersForPreemption().size() > 0) {
+        assertTrue("node should be reserved for the starvingApp",
+            node.getPreemptionList().keySet().contains(starvingApp));
+      }
+    }
+
     sendEnoughNodeUpdatesToAssignFully();
 
     // Verify the preempted containers are assigned to starvingApp
     assertEquals("Starved app is not assigned the right # of containers",
         numStarvedAppContainers, starvingApp.getLiveContainers().size());
+
+    // Verify the node is not reserved for the starvingApp anymore
+    for (RMNode rmNode : rmNodes) {
+      FSSchedulerNode node = (FSSchedulerNode)
+          scheduler.getNodeTracker().getNode(rmNode.getNodeID());
+      if (node.getContainersForPreemption().size() > 0) {
+        assertFalse(node.getPreemptionList().keySet().contains(starvingApp));
+      }
+    }
   }
 
   private void verifyNoPreemption() throws InterruptedException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org