You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:33 UTC

[18/33] git commit: YARN-1710. Logic to find allocations within a Plan that satisfy user ReservationRequest(s). Contributed by Carlo Curino and Subru Krishnan. (cherry picked from commit aef7928899b37262773f3dc117157bb746bf8918) (cherry picked from commi

YARN-1710. Logic to find allocations within a Plan that satisfy user ReservationRequest(s). Contributed by Carlo Curino and Subru Krishnan.
(cherry picked from commit aef7928899b37262773f3dc117157bb746bf8918)
(cherry picked from commit f66ffcf832235e0da0bb050fff08e248b547c360)
(cherry picked from commit 6a3c167175ba98e002b6a67ce73355974c1263da)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ee027b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ee027b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ee027b9

Branch: refs/heads/branch-2.6
Commit: 4ee027b9d6d38f50a075addaf51159ae2cdaca65
Parents: 056b7f5
Author: carlo curino <Carlo Curino>
Authored: Mon Sep 15 16:56:28 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:29:13 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |  12 +-
 .../reservation/GreedyReservationAgent.java     | 367 ++++++++++++
 .../reservation/ReservationAgent.java           |  55 ++
 .../exceptions/ContractValidationException.java |  12 +
 .../reservation/TestGreedyReservationAgent.java | 588 +++++++++++++++++++
 5 files changed, 1031 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee027b9/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index 410d974..deece7c 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -4,7 +4,13 @@ CapacityScheduler. (Carlo Curino and Subru Krishnan via curino)
 YARN-2475. Logic for responding to capacity drops for the 
 ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
 
-YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
+YARN-1708. Public YARN APIs for creating/updating/deleting 
+reservations. (Carlo Curino and Subru Krishnan via subru)
 
-YARN-1709. In-memory data structures used to track resources over time to
-enable reservations. (subru)
+YARN-1709. In-memory data structures used to track resources over
+time to enable reservations. (Carlo Curino and Subru Krishnan via 
+subru)
+
+YARN-1710. Logic to find allocations within a Plan that satisfy 
+user ReservationRequest(s). (Carlo Curino and Subru Krishnan via 
+curino) 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee027b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
new file mode 100644
index 0000000..3214f93
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
@@ -0,0 +1,367 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Agent employs a simple greedy placement strategy, placing the various
+ * stages of a {@link ReservationRequest} from the deadline moving backward
+ * towards the arrival. This allows jobs with earlier deadline to be scheduled
+ * greedily as well. Combined with an opportunistic anticipation of work if the
+ * cluster is not fully utilized also seems to provide good latency for
+ * best-effort jobs (i.e., jobs running without a reservation).
+ * 
+ * This agent does not account for locality and only consider container
+ * granularity for validation purposes (i.e., you can't exceed max-container
+ * size).
+ */
+public class GreedyReservationAgent implements ReservationAgent {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(GreedyReservationAgent.class);
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+    return computeAllocation(reservationId, user, plan, contract, null);
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+    return computeAllocation(reservationId, user, plan, contract,
+        plan.getReservationById(reservationId));
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+    return plan.deleteReservation(reservationId);
+  }
+
+  private boolean computeAllocation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract,
+      ReservationAllocation oldReservation) throws PlanningException,
+      ContractValidationException {
+    LOG.info("placing the following ReservationRequest: " + contract);
+
+    Resource totalCapacity = plan.getTotalCapacity();
+
+    // Here we can addd logic to adjust the ResourceDefinition to account for
+    // system "imperfections" (e.g., scheduling delays for large containers).
+
+    // Align with plan step conservatively (i.e., ceil arrival, and floor
+    // deadline)
+    long earliestStart = contract.getArrival();
+    long step = plan.getStep();
+    if (earliestStart % step != 0) {
+      earliestStart = earliestStart + (step - (earliestStart % step));
+    }
+    long deadline =
+        contract.getDeadline() - contract.getDeadline() % plan.getStep();
+
+    // setup temporary variables to handle time-relations between stages and
+    // intermediate answers
+    long curDeadline = deadline;
+    long oldDeadline = -1;
+
+    Map<ReservationInterval, ReservationRequest> allocations =
+        new HashMap<ReservationInterval, ReservationRequest>();
+    RLESparseResourceAllocation tempAssigned =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+    List<ReservationRequest> stages = contract.getReservationRequests()
+        .getReservationResources();
+    ReservationRequestInterpreter type = contract.getReservationRequests()
+        .getInterpreter();
+
+    // Iterate the stages in backward from deadline
+    for (ListIterator<ReservationRequest> li = 
+        stages.listIterator(stages.size()); li.hasPrevious();) {
+
+      ReservationRequest currentReservationStage = li.previous();
+
+      // validate the RR respect basic constraints
+      validateInput(plan, currentReservationStage, totalCapacity);
+
+      // run allocation for a single stage
+      Map<ReservationInterval, ReservationRequest> curAlloc =
+          placeSingleStage(plan, tempAssigned, currentReservationStage,
+              earliestStart, curDeadline, oldReservation, totalCapacity);
+
+      if (curAlloc == null) {
+        // if we did not find an allocation for the currentReservationStage
+        // return null, unless the ReservationDefinition we are placing is of
+        // type ANY
+        if (type != ReservationRequestInterpreter.R_ANY) {
+          throw new PlanningException("The GreedyAgent"
+              + " couldn't find a valid allocation for your request");
+        } else {
+          continue;
+        }
+      } else {
+
+        // if we did find an allocation add it to the set of allocations
+        allocations.putAll(curAlloc);
+
+        // if this request is of type ANY we are done searching (greedy)
+        // and can return the current allocation (break-out of the search)
+        if (type == ReservationRequestInterpreter.R_ANY) {
+          break;
+        }
+
+        // if the request is of ORDER or ORDER_NO_GAP we constraint the next
+        // round of allocation to precede the current allocation, by setting
+        // curDeadline
+        if (type == ReservationRequestInterpreter.R_ORDER
+            || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+          curDeadline = findEarliestTime(curAlloc.keySet());
+
+          // for ORDER_NO_GAP verify that the allocation found so far has no
+          // gap, return null otherwise (the greedy procedure failed to find a
+          // no-gap
+          // allocation)
+          if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
+              && oldDeadline > 0) {
+            if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
+                .getStep()) {
+              throw new PlanningException("The GreedyAgent"
+                  + " couldn't find a valid allocation for your request");
+            }
+          }
+          // keep the variable oldDeadline pointing to the last deadline we
+          // found
+          oldDeadline = curDeadline;
+        }
+      }
+    }
+
+    // / If we got here is because we failed to find an allocation for the
+    // ReservationDefinition give-up and report failure to the user
+    if (allocations.isEmpty()) {
+      throw new PlanningException("The GreedyAgent"
+          + " couldn't find a valid allocation for your request");
+    }
+
+    // create reservation with above allocations if not null/empty
+
+    ReservationRequest ZERO_RES =
+        ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
+
+    long firstStartTime = findEarliestTime(allocations.keySet());
+    
+    // add zero-padding from arrival up to the first non-null allocation
+    // to guarantee that the reservation exists starting at arrival
+    if (firstStartTime > earliestStart) {
+      allocations.put(new ReservationInterval(earliestStart,
+          firstStartTime), ZERO_RES);
+      firstStartTime = earliestStart;
+      // consider to add trailing zeros at the end for simmetry
+    }
+
+    // Actually add/update the reservation in the plan.
+    // This is subject to validation as other agents might be placing
+    // in parallel and there might be sharing policies the agent is not
+    // aware off.
+    ReservationAllocation capReservation =
+        new InMemoryReservationAllocation(reservationId, contract, user,
+            plan.getQueueName(), firstStartTime,
+            findLatestTime(allocations.keySet()), allocations,
+            plan.getResourceCalculator(), plan.getMinimumAllocation());
+    if (oldReservation != null) {
+      return plan.updateReservation(capReservation);
+    } else {
+      return plan.addReservation(capReservation);
+    }
+  }
+
+  private void validateInput(Plan plan, ReservationRequest rr,
+      Resource totalCapacity) throws ContractValidationException {
+
+    if (rr.getConcurrency() < 1) {
+      throw new ContractValidationException("Gang Size should be >= 1");
+    }
+
+    if (rr.getNumContainers() <= 0) {
+      throw new ContractValidationException("Num containers should be >= 0");
+    }
+
+    // check that gangSize and numContainers are compatible
+    if (rr.getNumContainers() % rr.getConcurrency() != 0) {
+      throw new ContractValidationException(
+          "Parallelism must be an exact multiple of gang size");
+    }
+
+    // check that the largest container request does not exceed
+    // the cluster-wide limit for container sizes
+    if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
+        rr.getCapability(), plan.getMaximumAllocation())) {
+      throw new ContractValidationException("Individual"
+          + " capability requests should not exceed cluster's maxAlloc");
+    }
+  }
+
+  /**
+   * This method actually perform the placement of an atomic stage of the
+   * reservation. The key idea is to traverse the plan backward for a
+   * "lease-duration" worth of time, and compute what is the maximum multiple of
+   * our concurrency (gang) parameter we can fit. We do this and move towards
+   * previous instant in time until the time-window is exhausted or we placed
+   * all the user request.
+   */
+  private Map<ReservationInterval, ReservationRequest> placeSingleStage(
+      Plan plan, RLESparseResourceAllocation tempAssigned,
+      ReservationRequest rr, long earliestStart, long curDeadline,
+      ReservationAllocation oldResAllocation, final Resource totalCapacity) {
+
+    Map<ReservationInterval, ReservationRequest> allocationRequests =
+        new HashMap<ReservationInterval, ReservationRequest>();
+
+    // compute the gang as a resource and get the duration
+    Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
+    long dur = rr.getDuration();
+    long step = plan.getStep();
+
+    // ceil the duration to the next multiple of the plan step
+    if (dur % step != 0) {
+      dur += (step - (dur % step));
+    }
+
+    // we know for sure that this division has no remainder (part of contract
+    // with user, validate before
+    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+    int maxGang = 0;
+
+    // loop trying to place until we are done, or we are considering
+    // an invalid range of times
+    while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
+
+      // as we run along we remember how many gangs we can fit, and what
+      // was the most constraining moment in time (we will restart just
+      // after that to place the next batch)
+      maxGang = gangsToPlace;
+      long minPoint = curDeadline;
+      int curMaxGang = maxGang;
+
+      // start placing at deadline (excluded due to [,) interval semantics and
+      // move backward
+      for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
+          && maxGang > 0; t = t - plan.getStep()) {
+
+        // As we run along we will logically remove the previous allocation for
+        // this reservation
+        // if one existed
+        Resource oldResCap = Resource.newInstance(0, 0);
+        if (oldResAllocation != null) {
+          oldResCap = oldResAllocation.getResourcesAtTime(t);
+        }
+
+        // compute net available resources
+        Resource netAvailableRes = Resources.clone(totalCapacity);
+        Resources.addTo(netAvailableRes, oldResCap);
+        Resources.subtractFrom(netAvailableRes,
+            plan.getTotalCommittedResources(t));
+        Resources.subtractFrom(netAvailableRes,
+            tempAssigned.getCapacityAtTime(t));
+
+        // compute maximum number of gangs we could fit
+        curMaxGang =
+            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+                totalCapacity, netAvailableRes, gang));
+
+        // pick the minimum between available resources in this instant, and how
+        // many gangs we have to place
+        curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+        // compare with previous max, and set it. also remember *where* we found
+        // the minimum (useful for next attempts)
+        if (curMaxGang <= maxGang) {
+          maxGang = curMaxGang;
+          minPoint = t;
+        }
+      }
+
+      // if we were able to place any gang, record this, and decrement
+      // gangsToPlace
+      if (maxGang > 0) {
+        gangsToPlace -= maxGang;
+
+        ReservationInterval reservationInt =
+            new ReservationInterval(curDeadline - dur, curDeadline);
+        ReservationRequest reservationRes =
+            ReservationRequest.newInstance(rr.getCapability(),
+                rr.getConcurrency() * maxGang, rr.getConcurrency(),
+                rr.getDuration());
+        // remember occupied space (plan is read-only till we find a plausible
+        // allocation for the entire request). This is needed since we might be
+        // placing other ReservationRequest within the same
+        // ReservationDefinition,
+        // and we must avoid double-counting the available resources
+        tempAssigned.addInterval(reservationInt, reservationRes);
+        allocationRequests.put(reservationInt, reservationRes);
+
+      }
+
+      // reset our new starting point (curDeadline) to the most constraining
+      // point so far, we will look "left" of that to find more places where
+      // to schedule gangs (for sure nothing on the "right" of this point can
+      // fit a full gang.
+      curDeadline = minPoint;
+    }
+
+    // if no gangs are left to place we succeed and return the allocation
+    if (gangsToPlace == 0) {
+      return allocationRequests;
+    } else {
+      // If we are here is becasue we did not manage to satisfy this request.
+      // So we need to remove unwanted side-effect from tempAssigned (needed
+      // for ANY).
+      for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
+        allocationRequests.entrySet()) {
+        tempAssigned.removeInterval(tempAllocation.getKey(),
+            tempAllocation.getValue());
+      }
+      // and return null to signal failure in this allocation
+      return null;
+    }
+  }
+
+  // finds the leftmost point of this set of ReservationInterval
+  private long findEarliestTime(Set<ReservationInterval> resInt) {
+    long ret = Long.MAX_VALUE;
+    for (ReservationInterval s : resInt) {
+      if (s.getStartTime() < ret) {
+        ret = s.getStartTime();
+      }
+    }
+    return ret;
+  }
+
+  // finds the rightmost point of this set of ReservationIntervals
+  private long findLatestTime(Set<ReservationInterval> resInt) {
+    long ret = Long.MIN_VALUE;
+    for (ReservationInterval s : resInt) {
+      if (s.getEndTime() > ret) {
+        ret = s.getEndTime();
+      }
+    }
+    return ret;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee027b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
new file mode 100644
index 0000000..fe1941d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
@@ -0,0 +1,55 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An entity that seeks to acquire resources to satisfy an user's contract
+ */
+public interface ReservationAgent {
+
+  /**
+   * Create a reservation for the user that abides by the specified contract
+   * 
+   * @param reservationId the identifier of the reservation to be created.
+   * @param user the user who wants to create the reservation
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources the user requires for his
+   *          session
+   * 
+   * @return whether the create operation was successful or not
+   * @throws PlanningException if the session cannot be fitted into the plan
+   */
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException;
+
+  /**
+   * Update a reservation for the user that abides by the specified contract
+   * 
+   * @param reservationId the identifier of the reservation to be updated
+   * @param user the user who wants to create the session
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources the user requires for his
+   *          reservation
+   * 
+   * @return whether the update operation was successful or not
+   * @throws PlanningException if the reservation cannot be fitted into the plan
+   */
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException;
+
+  /**
+   * Delete an user reservation
+   * 
+   * @param reservationId the identifier of the reservation to be deleted
+   * @param user the user who wants to create the reservation
+   * @param plan the Plan to which the session must be fitted
+   * 
+   * @return whether the delete operation was successful or not
+   * @throws PlanningException if the reservation cannot be fitted into the plan
+   */
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee027b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
new file mode 100644
index 0000000..7ee5a76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
@@ -0,0 +1,12 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+public class ContractValidationException extends PlanningException {
+
+  private static final long serialVersionUID = 1L;
+
+  public ContractValidationException(String message) {
+    super(message);
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee027b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
new file mode 100644
index 0000000..0b0201d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
@@ -0,0 +1,588 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestGreedyReservationAgent {
+
+  ReservationAgent agent;
+  InMemoryPlan plan;
+  Resource minAlloc = Resource.newInstance(1024, 1);
+  ResourceCalculator res = new DefaultResourceCalculator();
+  Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+  Random rand = new Random();
+  long step;
+
+  @Before
+  public void setup() throws Exception {
+
+    long seed = rand.nextLong();
+    rand.setSeed(seed);
+    Log.info("Running with seed: " + seed);
+
+    // setting completely loose quotas
+    long timeWindow = 1000000L;
+    Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
+    step = 1000L;
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125);
+    String reservationQ = testUtil.getFullReservationQueueName();
+    CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+    capConf.setReservationWindow(reservationQ, timeWindow);
+    capConf.setMaximumCapacity(reservationQ, 100);
+    capConf.setAverageCapacity(reservationQ, 100);
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, capConf, new HashSet<String>());
+    agent = new GreedyReservationAgent();
+
+    QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated",
+        mock(ParentQueue.class), false, capConf);
+
+    plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+        res, minAlloc, maxAlloc, "dedicated", null, true);
+  }
+
+  @SuppressWarnings("javadoc")
+  @Test
+  public void testSimple() throws PlanningException {
+
+    prepareBasicPlan();
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(5 * step);
+    rr.setDeadline(20 * step);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 5, 10 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    for (long i = 10 * step; i < 20 * step; i++) {
+      assertTrue(
+          "Agent-based allocation unexpected",
+          Resources.equals(cs.getResourcesAtTime(i),
+              Resource.newInstance(2048 * 10, 2 * 10)));
+    }
+
+  }
+
+  @Test
+  public void testOrder() throws PlanningException {
+    prepareBasicPlan();
+
+    // create a completely utilized segment around time 30
+    int[] f = { 100, 100 };
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 30 * step, 30 * step + f.length * step,
+            ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+            res, minAlloc)));
+
+    // create a chain of 4 RR, mixing gang and non-gang
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(0 * step);
+    rr.setDeadline(70 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 1, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 10, 10, 20 * step);
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 4);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+
+    System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testOrderNoGapImpossible() throws PlanningException {
+    prepareBasicPlan();
+    // create a completely utilized segment at time 30
+    int[] f = { 100, 100 };
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 30 * step, 30 * step + f.length * step,
+            ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+            res, minAlloc)));
+
+    // create a chain of 4 RR, mixing gang and non-gang
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(0L);
+
+    rr.setDeadline(70L);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 1, 10);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 10, 10, 20);
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean result = false;
+    try {
+      // submit to agent
+      result = agent.createReservation(reservationID, "u1", plan, rr);
+      fail();
+    } catch (PlanningException p) {
+      // expected
+    }
+
+    // validate
+    assertFalse("Agent-based allocation should have failed", result);
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == 3);
+
+    System.out
+        .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+            + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testOrderNoGap() throws PlanningException {
+    prepareBasicPlan();
+    // create a chain of 4 RR, mixing gang and non-gang
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(0 * step);
+    rr.setDeadline(60 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 1, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 10, 10, 20 * step);
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    // validate
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
+
+  }
+
+  @Test
+  public void testSingleSliding() throws PlanningException {
+    prepareBasicPlan();
+
+    // create a single request for which we need subsequent (tight) packing.
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100 * step);
+    rr.setDeadline(120 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 200, 10, 10 * step);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate results, we expect the second one to be accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
+
+    System.out.println("--------AFTER packed ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAny() throws PlanningException {
+    prepareBasicPlan();
+    // create an ANY request, with an impossible step (last in list, first
+    // considered),
+    // and two satisfiable ones. We expect the second one to be returned.
+
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100 * step);
+    rr.setDeadline(120 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 5, 5, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 5, 10 * step);
+    ReservationRequest r3 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 110, 110, 10 * step);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r3);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean res = agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate results, we expect the second one to be accepted
+    assertTrue("Agent-based allocation failed", res);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+
+    System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+        + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAnyImpossible() throws PlanningException {
+    prepareBasicPlan();
+    // create an ANY request, with all impossible alternatives
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100L);
+    rr.setDeadline(120L);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+
+    // longer than arrival-deadline
+    ReservationRequest r1 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 35, 5, 30);
+    // above max cluster size
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 110, 110, 10);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r1);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean result = false;
+    try {
+      // submit to agent
+      result = agent.createReservation(reservationID, "u1", plan, rr);
+      fail();
+    } catch (PlanningException p) {
+      // expected
+    }
+    // validate results, we expect the second one to be accepted
+    assertFalse("Agent-based allocation should have failed", result);
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == 2);
+
+    System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAll() throws PlanningException {
+    prepareBasicPlan();
+    // create an ALL request
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100 * step);
+    rr.setDeadline(120 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 5, 5, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 10, 20 * step);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate results, we expect the second one to be accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+
+    System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+        + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAllImpossible() throws PlanningException {
+    prepareBasicPlan();
+    // create an ALL request, with an impossible combination, it should be
+    // rejected, and allocation remain unchanged
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100L);
+    rr.setDeadline(120L);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 55, 5, 10);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 55, 5, 20);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean result = false;
+    try {
+      // submit to agent
+      result = agent.createReservation(reservationID, "u1", plan, rr);
+      fail();
+    } catch (PlanningException p) {
+      // expected
+    }
+
+    // validate results, we expect the second one to be accepted
+    assertFalse("Agent-based allocation failed", result);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 2);
+
+    System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  private void prepareBasicPlan() throws PlanningException {
+
+    // insert in the reservation a couple of controlled reservations, to create
+    // conditions for assignment that are non-empty
+
+    int[] f = { 10, 10, 20, 20, 20, 10, 10 };
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
+                .generateAllocation(0, step, f), res, minAlloc)));
+
+    int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
+    Map<ReservationInterval, ReservationRequest> alloc = 
+        ReservationSystemTestUtil.generateAllocation(5000, step, f2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
+
+    System.out.println("--------BEFORE AGENT----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+  }
+
+  private boolean check(ReservationAllocation cs, long start, long end,
+      int containers, int mem, int cores) {
+
+    boolean res = true;
+    for (long i = start; i < end; i++) {
+      res = res
+          && Resources.equals(cs.getResourcesAtTime(i),
+              Resource.newInstance(mem * containers, cores * containers));
+    }
+    return res;
+  }
+
+  public void testStress(int numJobs) throws PlanningException, IOException {
+
+    long timeWindow = 1000000L;
+    Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
+    step = 1000L;
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
+    String reservationQ = testUtil.getFullReservationQueueName();
+    CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+    capConf.setReservationWindow(reservationQ, timeWindow);
+    capConf.setMaximumCapacity(reservationQ, 100);
+    capConf.setAverageCapacity(reservationQ, 100);
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, capConf, new HashSet<String>());
+
+    plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
+      clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
+
+    int acc = 0;
+    List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+    for (long i = 0; i < numJobs; i++) {
+      list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
+    }
+
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < numJobs; i++) {
+
+      try {
+        if (agent.createReservation(
+            ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
+            plan, list.get(i))) {
+          acc++;
+        }
+      } catch (PlanningException p) {
+        // ignore exceptions
+      }
+    }
+
+    long end = System.currentTimeMillis();
+    System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+        + " in " + (end - start) + "ms");
+  }
+
+  public static void main(String[] arg) {
+
+    // run a stress test with by default 1000 random jobs
+    int numJobs = 1000;
+    if (arg.length > 0) {
+      numJobs = Integer.parseInt(arg[0]);
+    }
+
+    try {
+      TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+      test.setup();
+      test.testStress(numJobs);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}