You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2015/12/06 08:13:40 UTC

[38/38] hadoop git commit: YARN-4358. Reservation System: Improve relationship between SharingPolicy and ReservationAgent. (Carlo Curino via asuresh)

YARN-4358. Reservation System: Improve relationship between SharingPolicy and ReservationAgent. (Carlo Curino via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/742632e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/742632e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/742632e3

Branch: refs/heads/yarn-2877
Commit: 742632e346604fd2b263bd42367165638fcf2416
Parents: 42d4901
Author: Arun Suresh <as...@apache.org>
Authored: Sat Dec 5 21:26:16 2015 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Sat Dec 5 21:26:16 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../reservation/CapacityOverTimePolicy.java     |  52 +++++++-
 .../reservation/InMemoryPlan.java               | 123 ++++++++++++++++++-
 .../InMemoryReservationAllocation.java          |  13 +-
 .../reservation/NoOverCommitPolicy.java         |   8 ++
 .../resourcemanager/reservation/PlanView.java   |  65 ++++++++--
 .../reservation/ReservationAllocation.java      |  12 +-
 .../reservation/SharingPolicy.java              |  24 +++-
 .../reservation/planning/IterativePlanner.java  |  16 +--
 .../reservation/planning/PlanningAlgorithm.java |  34 ++---
 .../reservation/planning/StageAllocator.java    |   6 +-
 .../planning/StageAllocatorGreedy.java          |  23 ++--
 .../planning/StageAllocatorLowCostAligned.java  |  14 ++-
 .../reservation/TestInMemoryPlan.java           |  72 ++++++-----
 .../planning/TestGreedyReservationAgent.java    |  94 +++++++++++++-
 15 files changed, 464 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 90ada4b..1fed6a6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -592,6 +592,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4405. Support node label store in non-appendable file system. (Wangda
     Tan via jianhe)
 
+    YARN-4358. Reservation System: Improve relationship between SharingPolicy
+    and ReservationAgent. (Carlo Curino via asuresh)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index afba7ea..424b543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -18,10 +18,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import java.util.Date;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
@@ -104,14 +108,17 @@ public class CapacityOverTimePolicy implements SharingPolicy {
     IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
     maxAllowed.multiplyBy(validWindow / step);
 
+    RLESparseResourceAllocation userCons =
+        plan.getConsumptionForUserOverTime(reservation.getUser(), startTime
+            - validWindow, endTime + validWindow);
+
     // check that the resources offered to the user during any window of length
     // "validWindow" overlapping this allocation are within maxAllowed
     // also enforce instantaneous and physical constraints during this pass
     for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
 
       Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
-      Resource currExistingAllocForUser =
-          plan.getConsumptionForUser(reservation.getUser(), t);
+      Resource currExistingAllocForUser = userCons.getCapacityAtTime(t);
       Resource currNewAlloc = reservation.getResourcesAtTime(t);
       Resource currOldAlloc = Resources.none();
       if (oldReservation != null) {
@@ -163,8 +170,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
 
       // expire contributions from instant in time before (t - validWindow)
       if (t > startTime) {
-        Resource pastOldAlloc =
-            plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
+        Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
         Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
 
         // runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
@@ -189,6 +195,39 @@ public class CapacityOverTimePolicy implements SharingPolicy {
   }
 
   @Override
+  public RLESparseResourceAllocation availableResources(
+      RLESparseResourceAllocation available, Plan plan, String user,
+      ReservationId oldId, long start, long end) throws PlanningException {
+
+    // this only propagates the instantaneous maxInst properties, while
+    // the time-varying one depends on the current allocation as well
+    // and are not easily captured here
+    Resource planTotalCapacity = plan.getTotalCapacity();
+    Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
+    NavigableMap<Long, Resource> instQuota = new TreeMap<Long, Resource>();
+    instQuota.put(start, maxInsRes);
+
+    RLESparseResourceAllocation instRLEQuota =
+        new RLESparseResourceAllocation(instQuota,
+            plan.getResourceCalculator());
+
+    RLESparseResourceAllocation used =
+        plan.getConsumptionForUserOverTime(user, start, end);
+
+    instRLEQuota =
+        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+            planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
+            end);
+
+    instRLEQuota =
+        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+            planTotalCapacity, available, instRLEQuota, RLEOperator.min, start,
+            end);
+
+    return instRLEQuota;
+  }
+
+  @Override
   public long getValidWindow() {
     return validWindow;
   }
@@ -198,7 +237,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
    * long(s), as using Resource to store the "integral" of the allocation over
    * time leads to integer overflows for large allocations/clusters. (Evolving
    * Resource to use long is too disruptive at this point.)
-   * 
+   *
    * The comparison/multiplication behaviors of IntegralResource are consistent
    * with the DefaultResourceCalculator.
    */
@@ -244,4 +283,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
       return "<memory:" + memory + ", vCores:" + vcores + ">";
     }
   }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index af42df9..c51c3ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -27,11 +27,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@@ -65,6 +67,9 @@ public class InMemoryPlan implements Plan {
   private Map<String, RLESparseResourceAllocation> userResourceAlloc =
       new HashMap<String, RLESparseResourceAllocation>();
 
+  private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
+      new HashMap<String, RLESparseResourceAllocation>();
+
   private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
       new HashMap<ReservationId, InMemoryReservationAllocation>();
 
@@ -121,6 +126,7 @@ public class InMemoryPlan implements Plan {
     return queueMetrics;
   }
 
+
   private void incrementAllocation(ReservationAllocation reservation) {
     assert (readWriteLock.isWriteLockedByCurrentThread());
     Map<ReservationInterval, Resource> allocationRequests =
@@ -132,11 +138,27 @@ public class InMemoryPlan implements Plan {
       resAlloc = new RLESparseResourceAllocation(resCalc);
       userResourceAlloc.put(user, resAlloc);
     }
+    RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
+    if (resCount == null) {
+      resCount = new RLESparseResourceAllocation(resCalc);
+      userActiveReservationCount.put(user, resCount);
+    }
+
+    long earliestActive = Long.MAX_VALUE;
+    long latestActive = Long.MIN_VALUE;
+
     for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
         .entrySet()) {
       resAlloc.addInterval(r.getKey(), r.getValue());
       rleSparseVector.addInterval(r.getKey(), r.getValue());
+      if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+          ZERO_RESOURCE)) {
+        earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+        latestActive = Math.max(latestActive, r.getKey().getEndTime());
+      }
     }
+    resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
+        Resource.newInstance(1, 1));
   }
 
   private void decrementAllocation(ReservationAllocation reservation) {
@@ -145,14 +167,29 @@ public class InMemoryPlan implements Plan {
         reservation.getAllocationRequests();
     String user = reservation.getUser();
     RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+
+    long earliestActive = Long.MAX_VALUE;
+    long latestActive = Long.MIN_VALUE;
     for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
         .entrySet()) {
       resAlloc.removeInterval(r.getKey(), r.getValue());
       rleSparseVector.removeInterval(r.getKey(), r.getValue());
+      if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+          ZERO_RESOURCE)) {
+        earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+        latestActive = Math.max(latestActive, r.getKey().getEndTime());
+      }
     }
     if (resAlloc.isEmpty()) {
       userResourceAlloc.remove(user);
     }
+
+    RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
+    resCount.removeInterval(new ReservationInterval(earliestActive,
+        latestActive), Resource.newInstance(1, 1));
+    if (resCount.isEmpty()) {
+      userActiveReservationCount.remove(user);
+    }
   }
 
   public Set<ReservationAllocation> getAllReservations() {
@@ -160,9 +197,9 @@ public class InMemoryPlan implements Plan {
     try {
       if (currentReservations != null) {
         Set<ReservationAllocation> flattenedReservations =
-            new HashSet<ReservationAllocation>();
-        for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
-            .values()) {
+            new TreeSet<ReservationAllocation>();
+        for (Set<InMemoryReservationAllocation> reservationEntries :
+            currentReservations.values()) {
           flattenedReservations.addAll(reservationEntries);
         }
         return flattenedReservations;
@@ -417,14 +454,34 @@ public class InMemoryPlan implements Plan {
   }
 
   @Override
-  public Resource getConsumptionForUser(String user, long t) {
+  public RLESparseResourceAllocation getReservationCountForUserOverTime(
+      String user, long start, long end) {
+    readLock.lock();
+    try {
+      RLESparseResourceAllocation userResAlloc =
+          userActiveReservationCount.get(user);
+
+      if (userResAlloc != null) {
+        return userResAlloc.getRangeOverlapping(start, end);
+      } else {
+        return new RLESparseResourceAllocation(resCalc);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+      long start, long end) {
     readLock.lock();
     try {
       RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+
       if (userResAlloc != null) {
-        return userResAlloc.getCapacityAtTime(t);
+        return userResAlloc.getRangeOverlapping(start, end);
       } else {
-        return Resources.clone(ZERO_RESOURCE);
+        return new RLESparseResourceAllocation(resCalc);
       }
     } finally {
       readLock.unlock();
@@ -465,6 +522,43 @@ public class InMemoryPlan implements Plan {
   }
 
   @Override
+  public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+      ReservationId oldId, long start, long end) throws PlanningException {
+    readLock.lock();
+    try {
+      // create RLE of totCapacity
+      TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
+      totAvailable.put(start, Resources.clone(totalCapacity));
+      RLESparseResourceAllocation totRLEAvail =
+          new RLESparseResourceAllocation(totAvailable, resCalc);
+
+      // subtract used from available
+      RLESparseResourceAllocation netAvailable;
+
+      netAvailable =
+          RLESparseResourceAllocation.merge(resCalc,
+              Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
+              RLEOperator.subtractTestNonNegative, start, end);
+
+      // add back in old reservation used resources if any
+      ReservationAllocation old = reservationTable.get(oldId);
+      if (old != null) {
+        netAvailable =
+            RLESparseResourceAllocation.merge(resCalc,
+                Resources.clone(totalCapacity), netAvailable,
+                old.getResourcesOverTime(), RLEOperator.add, start, end);
+      }
+      // lower it if this is needed by the sharing policy
+      netAvailable =
+          getSharingPolicy().availableResources(netAvailable, this, user,
+              oldId, start, end);
+      return netAvailable;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public Resource getMinimumAllocation() {
     return Resources.clone(minAlloc);
   }
@@ -549,4 +643,21 @@ public class InMemoryPlan implements Plan {
     }
   }
 
+  @Override
+  public Set<ReservationAllocation> getReservationByUserAtTime(String user,
+      long t) {
+    readLock.lock();
+    try {
+      Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>();
+      for (ReservationAllocation ra : getReservationsAtTime(t)) {
+        String resUser = ra.getUser();
+        if (resUser != null && resUser.equals(user)) {
+          resSet.add(ra);
+        }
+      }
+      return resSet;
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index 55ab066..69fd43f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -133,11 +133,16 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
   }
 
   @Override
+  public RLESparseResourceAllocation getResourcesOverTime(){
+    return resourcesOverTime;
+  }
+
+  @Override
   public String toString() {
     StringBuilder sBuf = new StringBuilder();
     sBuf.append(getReservationId()).append(" user:").append(getUser())
         .append(" startTime: ").append(getStartTime()).append(" endTime: ")
-        .append(getEndTime()).append(" alloc:[")
+        .append(getEndTime()).append(" alloc:\n[")
         .append(resourcesOverTime.toString()).append("] ");
     return sBuf.toString();
   }
@@ -151,6 +156,12 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
     if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
       return 1;
     }
+    if (this.getReservationId().getId() > other.getReservationId().getId()) {
+      return -1;
+    }
+    if (this.getReservationId().getId() < other.getReservationId().getId()) {
+      return 1;
+    }
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index f87e9dc..119520b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@@ -89,4 +90,11 @@ public class NoOverCommitPolicy implements SharingPolicy {
     // nothing to do for this policy
   }
 
+  @Override
+  public RLESparseResourceAllocation availableResources(
+      RLESparseResourceAllocation available, Plan plan, String user,
+      ReservationId oldId, long start, long end) throws PlanningException {
+    return available;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 66c66ca..f57c2e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 
 import java.util.Set;
 
@@ -41,6 +42,17 @@ public interface PlanView extends PlanContext {
   public ReservationAllocation getReservationById(ReservationId reservationID);
 
   /**
+   * Return a set of {@link ReservationAllocation} that belongs to a certain
+   * user and overlaps time t.
+   *
+   * @param user the user being considered
+   * @param t the instant in time being considered
+   * @return {@link Set<ReservationAllocation>} for this user at this time
+   */
+  public Set<ReservationAllocation> getReservationByUserAtTime(String user,
+      long t);
+
+  /**
    * Gets all the active reservations at the specified point of time
    * 
    * @param tick the time (UTC in ms) for which the active reservations are
@@ -68,18 +80,6 @@ public interface PlanView extends PlanContext {
   Resource getTotalCommittedResources(long tick);
 
   /**
-   * Returns the total {@link Resource} reserved for a given user at the
-   * specified time
-   * 
-   * @param user the user who made the reservation(s)
-   * @param tick the time (UTC in ms) for which the reserved resources are
-   *          requested
-   * @return the total {@link Resource} reserved for a given user at the
-   *         specified time
-   */
-  public Resource getConsumptionForUser(String user, long tick);
-
-  /**
    * Returns the overall capacity in terms of {@link Resource} assigned to this
    * plan (typically will correspond to the absolute capacity of the
    * corresponding queue).
@@ -98,9 +98,48 @@ public interface PlanView extends PlanContext {
 
   /**
    * Returns the time (UTC in ms) at which the last reservation terminates
-   * 
+   *
    * @return the time (UTC in ms) at which the last reservation terminates
    */
   public long getLastEndTime();
 
+  /**
+   * This method returns the amount of resources available to a given user
+   * (optionally if removing a certain reservation) over the start-end time
+   * range.
+   *
+   * @param user
+   * @param oldId
+   * @param start
+   * @param end
+   * @return a view of the plan as it is available to this user
+   * @throws PlanningException
+   */
+  public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+      ReservationId oldId, long start, long end) throws PlanningException;
+
+  /**
+   * This method returns a RLE encoded view of the user reservation count
+   * utilization between start and end time.
+   *
+   * @param user
+   * @param start
+   * @param end
+   * @return RLE encoded view of reservation used over time
+   */
+  public RLESparseResourceAllocation getReservationCountForUserOverTime(
+      String user, long start, long end);
+
+  /**
+   * This method returns a RLE encoded view of the user reservation utilization
+   * between start and end time.
+   *
+   * @param user
+   * @param start
+   * @param end
+   * @return RLE encoded view of resources used over time
+   */
+  public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+      long start, long end);
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 0d3c692..0da95ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -50,14 +50,14 @@ public interface ReservationAllocation extends
   public ReservationDefinition getReservationDefinition();
 
   /**
-   * Returns the time at which the reservation is activated
+   * Returns the time at which the reservation is activated.
    * 
    * @return the time at which the reservation is activated
    */
   public long getStartTime();
 
   /**
-   * Returns the time at which the reservation terminates
+   * Returns the time at which the reservation terminates.
    * 
    * @return the time at which the reservation terminates
    */
@@ -65,7 +65,7 @@ public interface ReservationAllocation extends
 
   /**
    * Returns the map of resources requested against the time interval for which
-   * they were
+   * they were.
    * 
    * @return the allocationRequests the map of resources requested against the
    *         time interval for which they were
@@ -118,4 +118,10 @@ public interface ReservationAllocation extends
    */
   public Resource getResourcesAtTime(long tick);
 
+  /**
+   * Return a RLE representation of used resources.
+   * @return a RLE encoding of resources allocated over time.
+   */
+  public RLESparseResourceAllocation getResourcesOverTime();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
index 8f8d24c..e458055 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 
 /**
@@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
 public interface SharingPolicy {
 
   /**
-   * Initialize this policy
+   * Initialize this policy.
    * 
    * @param planQueuePath the name of the queue for this plan
    * @param conf the system configuration
@@ -54,6 +55,26 @@ public interface SharingPolicy {
       throws PlanningException;
 
   /**
+   * This method provide a (partial) instantaneous validation by applying
+   * business rules (such as max number of parallel containers allowed for a
+   * user). To provide the agent with more feedback the returned parameter is
+   * expressed in number of containers that can be fit in this time according to
+   * the business rules.
+   *
+   * @param available the amount of resources that would be offered if not
+   *          constrained by the policy
+   * @param plan reference the the current Plan
+   * @param user the username
+   * @param start the start time for the range we are querying
+   * @param end the end time for the range we are querying
+   * @param oldId (optional) the id of a reservation being updated
+   * @throws PlanningException throws if the request is not valid
+   */
+  public RLESparseResourceAllocation availableResources(
+      RLESparseResourceAllocation available, Plan plan, String user,
+      ReservationId oldId, long start, long end) throws PlanningException;
+
+  /**
    * Returns the time range before and after the current reservation considered
    * by this policy. In particular, this informs the archival process for the
    * {@link Plan}, i.e., reservations regarding times before (now - validWindow)
@@ -63,4 +84,5 @@ public interface SharingPolicy {
    */
   public long getValidWindow();
 
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index d05b0ef..77362d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResour
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -80,8 +81,8 @@ public class IterativePlanner extends PlanningAlgorithm {
 
   @Override
   public RLESparseResourceAllocation computeJobAllocation(Plan plan,
-      ReservationId reservationId, ReservationDefinition reservation)
-      throws ContractValidationException {
+      ReservationId reservationId, ReservationDefinition reservation,
+      String user) throws PlanningException {
 
     // Initialize
     initialize(plan, reservation);
@@ -142,7 +143,7 @@ public class IterativePlanner extends PlanningAlgorithm {
       // Compute the allocation of a single stage
       Map<ReservationInterval, Resource> curAlloc =
           computeStageAllocation(plan, currentReservationStage,
-              stageArrivalTime, stageDeadline);
+              stageArrivalTime, stageDeadline, user, reservationId);
 
       // If we did not find an allocation, return NULL
       // (unless it's an ANY job, then we simply continue).
@@ -159,8 +160,8 @@ public class IterativePlanner extends PlanningAlgorithm {
       }
 
       // Get the start & end time of the current allocation
-      Long stageStartTime = findEarliestTime(curAlloc.keySet());
-      Long stageEndTime = findLatestTime(curAlloc.keySet());
+      Long stageStartTime = findEarliestTime(curAlloc);
+      Long stageEndTime = findLatestTime(curAlloc);
 
       // If we did find an allocation for the stage, add it
       for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
@@ -310,10 +311,11 @@ public class IterativePlanner extends PlanningAlgorithm {
   // Call algStageAllocator
   protected Map<ReservationInterval, Resource> computeStageAllocation(
       Plan plan, ReservationRequest rr, long stageArrivalTime,
-      long stageDeadline) {
+      long stageDeadline, String user, ReservationId oldId)
+      throws PlanningException {
 
     return algStageAllocator.computeStageAllocation(plan, planLoads,
-        planModifications, rr, stageArrivalTime, stageDeadline);
+        planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index 8b72b9f..e1b508d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -62,7 +62,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
 
     // Compute the job allocation
     RLESparseResourceAllocation allocation =
-        computeJobAllocation(plan, reservationId, adjustedContract);
+        computeJobAllocation(plan, reservationId, adjustedContract, user);
 
     // If no job allocation was found, fail
     if (allocation == null) {
@@ -84,8 +84,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
             adjustedContract, // Contract
             user, // User name
             plan.getQueueName(), // Queue name
-            findEarliestTime(mapAllocations.keySet()), // Earliest start time
-            findLatestTime(mapAllocations.keySet()), // Latest end time
+            findEarliestTime(mapAllocations), // Earliest start time
+            findLatestTime(mapAllocations), // Latest end time
             mapAllocations, // Allocations
             plan.getResourceCalculator(), // Resource calculator
             plan.getMinimumAllocation()); // Minimum allocation
@@ -111,14 +111,14 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
     Resource zeroResource = Resource.newInstance(0, 0);
 
     // Pad at the beginning
-    long earliestStart = findEarliestTime(mapAllocations.keySet());
+    long earliestStart = findEarliestTime(mapAllocations);
     if (jobArrival < earliestStart) {
       mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
           zeroResource);
     }
 
     // Pad at the beginning
-    long latestEnd = findLatestTime(mapAllocations.keySet());
+    long latestEnd = findLatestTime(mapAllocations);
     if (latestEnd < jobDeadline) {
       mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
           zeroResource);
@@ -129,8 +129,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
   }
 
   public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
-      ReservationId reservationId, ReservationDefinition reservation)
-      throws PlanningException, ContractValidationException;
+      ReservationId reservationId, ReservationDefinition reservation,
+      String user) throws PlanningException, ContractValidationException;
 
   @Override
   public boolean createReservation(ReservationId reservationId, String user,
@@ -162,24 +162,26 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
 
   }
 
-  protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
+  protected static long findEarliestTime(
+      Map<ReservationInterval, Resource> sesInt) {
 
     long ret = Long.MAX_VALUE;
-    for (ReservationInterval s : sesInt) {
-      if (s.getStartTime() < ret) {
-        ret = s.getStartTime();
+    for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
+      if (s.getKey().getStartTime() < ret && s.getValue() != null) {
+        ret = s.getKey().getStartTime();
       }
     }
     return ret;
 
   }
 
-  protected static long findLatestTime(Set<ReservationInterval> sesInt) {
+  protected static long findLatestTime(Map<ReservationInterval,
+      Resource> sesInt) {
 
     long ret = Long.MIN_VALUE;
-    for (ReservationInterval s : sesInt) {
-      if (s.getEndTime() > ret) {
-        ret = s.getEndTime();
+    for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
+      if (s.getKey().getEndTime() > ret && s.getValue() != null) {
+        ret = s.getKey().getEndTime();
       }
     }
     return ret;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index 9df6b74..b95f8d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 
 /**
  * Interface for allocating a single stage in IterativePlanner.
@@ -46,10 +48,12 @@ public interface StageAllocator {
    *
    * @return The computed allocation (or null if the stage could not be
    *         allocated)
+   * @throws PlanningException
    */
   Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       Map<Long, Resource> planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline);
+      long stageEarliestStart, long stageDeadline, String user,
+      ReservationId oldId) throws PlanningException;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index 773fbdf..c836970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -40,7 +43,8 @@ public class StageAllocatorGreedy implements StageAllocator {
   public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       Map<Long, Resource> planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline) {
+      long stageEarliestStart, long stageDeadline, String user,
+      ReservationId oldId) throws PlanningException {
 
     Resource totalCapacity = plan.getTotalCapacity();
 
@@ -63,6 +67,15 @@ public class StageAllocatorGreedy implements StageAllocator {
 
     int maxGang = 0;
 
+    RLESparseResourceAllocation netAvailable =
+        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
+            stageDeadline);
+
+    netAvailable =
+        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+            plan.getTotalCapacity(), netAvailable, planModifications,
+            RLEOperator.subtract, stageEarliestStart, stageDeadline);
+
     // loop trying to place until we are done, or we are considering
     // an invalid range of times
     while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
@@ -79,13 +92,7 @@ public class StageAllocatorGreedy implements StageAllocator {
       for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
           && maxGang > 0; t = t - plan.getStep()) {
 
-        // compute net available resources
-        Resource netAvailableRes = Resources.clone(totalCapacity);
-        // Resources.addTo(netAvailableRes, oldResCap);
-        Resources.subtractFrom(netAvailableRes,
-            plan.getTotalCommittedResources(t));
-        Resources.subtractFrom(netAvailableRes,
-            planModifications.getCapacityAtTime(t));
+        Resource netAvailableRes = netAvailable.getCapacityAtTime(t);
 
         // compute maximum number of gangs we could fit
         curMaxGang =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index 04cce7b..b9fd8e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
@@ -60,7 +61,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
   public Map<ReservationInterval, Resource> computeStageAllocation(
       Plan plan, Map<Long, Resource> planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline) {
+      long stageEarliestStart, long stageDeadline, String user,
+      ReservationId oldId) {
 
     // Initialize
     ResourceCalculator resCalc = plan.getResourceCalculator();
@@ -136,7 +138,9 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
       DurationInterval bestDurationInterval =
           durationIntervalsSortedByCost.first();
       int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
-
+      numGangsToAllocate =
+          Math.min(numGangsToAllocate,
+              bestDurationInterval.numCanFit(gang, capacity, resCalc));
       // Add it
       remainingGangs -= numGangsToAllocate;
 
@@ -355,5 +359,11 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
       this.cost = value;
     }
 
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(" start: " + startTime).append(" end: " + endTime)
+          .append(" cost: " + cost).append(" maxLoad: " + maxLoad);
+      return sb.toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 2e262a0..1756e86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -118,11 +118,18 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
+    checkAllocation(plan, alloc, start);
+  }
+
+  private void checkAllocation(Plan plan, int[] alloc, int start) {
+    RLESparseResourceAllocation userCons =
+        plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
+
     for (int i = 0; i < alloc.length; i++) {
       Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
           plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
-          plan.getConsumptionForUser(user, start + i));
+          userCons.getCapacityAtTime(start + i));
     }
   }
 
@@ -180,12 +187,7 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
-    for (int i = 0; i < alloc.length; i++) {
-      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
-          plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
-          plan.getConsumptionForUser(user, start + i));
-    }
+    checkAllocation(plan, alloc, start);
 
     // Try to add it again
     try {
@@ -226,11 +228,14 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
+
+    RLESparseResourceAllocation userCons =
+        plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
     for (int i = 0; i < alloc.length; i++) {
       Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
           plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
-          plan.getConsumptionForUser(user, start + i));
+          userCons.getCapacityAtTime(start + i));
     }
 
     // Now update it
@@ -252,13 +257,18 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
+
+    userCons =
+        plan.getConsumptionForUserOverTime(user, start, start
+            + updatedAlloc.length);
+
     for (int i = 0; i < updatedAlloc.length; i++) {
       Assert.assertEquals(
-          Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+     Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
               + i), plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(
           Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
-              + i), plan.getConsumptionForUser(user, start + i));
+              + i), userCons.getCapacityAtTime(start + i));
     }
   }
 
@@ -321,13 +331,17 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
+
+    RLESparseResourceAllocation userCons =
+        plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
+
     for (int i = 0; i < alloc.length; i++) {
       Assert.assertEquals(
           Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
           plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(
           Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
-          plan.getConsumptionForUser(user, start + i));
+          userCons.getCapacityAtTime(start + i));
     }
 
     // Now delete it
@@ -337,11 +351,13 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     Assert.assertNull(plan.getReservationById(reservationID));
+    userCons =
+        plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
     for (int i = 0; i < alloc.length; i++) {
       Assert.assertEquals(Resource.newInstance(0, 0),
           plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(Resource.newInstance(0, 0),
-          plan.getConsumptionForUser(user, start + i));
+          userCons.getCapacityAtTime(start + i));
     }
   }
 
@@ -393,14 +409,8 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     doAssertions(plan, rAllocation);
-    for (int i = 0; i < alloc1.length; i++) {
-      Assert.assertEquals(
-          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
-          plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(
-          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
-          plan.getConsumptionForUser(user, start + i));
-    }
+    checkAllocation(plan, alloc1, start);
+
 
     // Now add another one
     ReservationId reservationID2 =
@@ -424,13 +434,17 @@ public class TestInMemoryPlan {
       Assert.fail(e.getMessage());
     }
     Assert.assertNotNull(plan.getReservationById(reservationID2));
+
+    RLESparseResourceAllocation userCons =
+        plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
+
     for (int i = 0; i < alloc2.length; i++) {
       Assert.assertEquals(
           Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
               + alloc2[i] + i), plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(
           Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
-              + alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
+              + alloc2[i] + i), userCons.getCapacityAtTime(start + i));
     }
 
     // Now archive completed reservations
@@ -445,14 +459,8 @@ public class TestInMemoryPlan {
     }
     Assert.assertNotNull(plan.getReservationById(reservationID1));
     Assert.assertNull(plan.getReservationById(reservationID2));
-    for (int i = 0; i < alloc1.length; i++) {
-      Assert.assertEquals(
-          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
-          plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(
-          Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
-          plan.getConsumptionForUser(user, start + i));
-    }
+    checkAllocation(plan, alloc1, start);
+
     when(clock.getTime()).thenReturn(107L);
     try {
       // will remove 1st reservation also as it has fallen out of the archival
@@ -461,12 +469,16 @@ public class TestInMemoryPlan {
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
+
+    userCons =
+        plan.getConsumptionForUserOverTime(user, start, start + alloc1.length);
+
     Assert.assertNull(plan.getReservationById(reservationID1));
     for (int i = 0; i < alloc1.length; i++) {
       Assert.assertEquals(Resource.newInstance(0, 0),
           plan.getTotalCommittedResources(start + i));
       Assert.assertEquals(Resource.newInstance(0, 0),
-          plan.getConsumptionForUser(user, start + i));
+          userCons.getCapacityAtTime(start + i));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
index cb4eaeb..f81e7ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -86,6 +89,7 @@ public class TestGreedyReservationAgent {
             instConstraint, avgConstraint);
     CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
     policy.init(reservationQ, conf);
+
     agent = new GreedyReservationAgent();
 
     QueueMetrics queueMetrics = mock(QueueMetrics.class);
@@ -135,6 +139,94 @@ public class TestGreedyReservationAgent {
 
   }
 
+  @SuppressWarnings("javadoc")
+  @Test
+  public void testSharingPolicyFeedback() throws PlanningException {
+
+    prepareBasicPlan();
+
+    // let's constraint the instantaneous allocation and see the
+    // policy kicking in during planning
+    float instConstraint = 40;
+    float avgConstraint = 40;
+
+    ReservationSchedulerConfiguration conf =
+        ReservationSystemTestUtil.createConf(plan.getQueueName(), 100000,
+            instConstraint, avgConstraint);
+
+    plan.getSharingPolicy().init(plan.getQueueName(), conf);
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(5 * step);
+    rr.setDeadline(100 * step);
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
+            10 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    agent.createReservation(reservationID, "u3", plan, rr);
+
+    ReservationId reservationID2 =
+        ReservationSystemTestUtil.getNewReservationId();
+    agent.createReservation(reservationID2, "u3", plan, rr);
+
+    ReservationDefinition rr3 = new ReservationDefinitionPBImpl();
+    rr3.setArrival(5 * step);
+    rr3.setDeadline(100 * step);
+    ReservationRequest r3 =
+        ReservationRequest.newInstance(Resource.newInstance(2048, 2), 45, 45,
+            10 * step);
+    ReservationRequests reqs3 = new ReservationRequestsPBImpl();
+    reqs3.setReservationResources(Collections.singletonList(r3));
+    rr3.setReservationRequests(reqs3);
+
+    ReservationId reservationID3 =
+        ReservationSystemTestUtil.getNewReservationId();
+    try {
+      // RR3 is simply too big to fit
+      agent.createReservation(reservationID3, "u3", plan, rr3);
+      fail();
+    } catch (PlanningException pe) {
+      // expected
+    }
+
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 4);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+    ReservationAllocation cs2 = plan.getReservationById(reservationID2);
+    ReservationAllocation cs3 = plan.getReservationById(reservationID3);
+
+    assertNotNull(cs);
+    assertNotNull(cs2);
+    assertNull(cs3);
+
+    System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    for (long i = 90 * step; i < 100 * step; i++) {
+      assertTrue(
+          "Agent-based allocation unexpected",
+          Resources.equals(cs.getResourcesAtTime(i),
+              Resource.newInstance(2048 * 20, 2 * 20)));
+    }
+    // RR2 is pushed out by the presence of RR
+    for (long i = 80 * step; i < 90 * step; i++) {
+      assertTrue(
+          "Agent-based allocation unexpected",
+          Resources.equals(cs2.getResourcesAtTime(i),
+              Resource.newInstance(2048 * 20, 2 * 20)));
+    }
+  }
+
   @Test
   public void testOrder() throws PlanningException {
     prepareBasicPlan();
@@ -186,7 +278,6 @@ public class TestGreedyReservationAgent {
     assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
     assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
     assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
-
     System.out.println("--------AFTER ORDER ALLOCATION (queue: "
         + reservationID + ")----------");
     System.out.println(plan.toString());
@@ -376,7 +467,6 @@ public class TestGreedyReservationAgent {
     ReservationAllocation cs = plan.getReservationById(reservationID);
 
     assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
-
     System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
         + ")----------");
     System.out.println(plan.toString());