You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2015/07/25 16:44:06 UTC

[3/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
new file mode 100644
index 0000000..9a0a0f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An abstract class that follows the general behavior of planning algorithms.
+ */
+public abstract class PlanningAlgorithm implements ReservationAgent {
+
+  /**
+   * Performs the actual allocation for a ReservationDefinition within a Plan.
+   *
+   * @param reservationId the identifier of the reservation
+   * @param user the user who owns the reservation
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources required by the user for his
+   *          session
+   * @param oldReservation the existing reservation (null if none)
+   * @return whether the allocateUser function was successful or not
+   *
+   * @throws PlanningException if the session cannot be fitted into the plan
+   * @throws ContractValidationException
+   */
+  protected boolean allocateUser(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract,
+      ReservationAllocation oldReservation) throws PlanningException,
+      ContractValidationException {
+
+    // Adjust the ResourceDefinition to account for system "imperfections"
+    // (e.g., scheduling delays for large containers).
+    ReservationDefinition adjustedContract = adjustContract(plan, contract);
+
+    // Compute the job allocation
+    RLESparseResourceAllocation allocation =
+        computeJobAllocation(plan, reservationId, adjustedContract);
+
+    // If no job allocation was found, fail
+    if (allocation == null) {
+      throw new PlanningException(
+          "The planning algorithm could not find a valid allocation"
+              + " for your request");
+    }
+
+    // Translate the allocation to a map (with zero paddings)
+    long step = plan.getStep();
+    long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
+    long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
+    Map<ReservationInterval, Resource> mapAllocations =
+        allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
+
+    // Create the reservation
+    ReservationAllocation capReservation =
+        new InMemoryReservationAllocation(reservationId, // ID
+            adjustedContract, // Contract
+            user, // User name
+            plan.getQueueName(), // Queue name
+            findEarliestTime(mapAllocations.keySet()), // Earliest start time
+            findLatestTime(mapAllocations.keySet()), // Latest end time
+            mapAllocations, // Allocations
+            plan.getResourceCalculator(), // Resource calculator
+            plan.getMinimumAllocation()); // Minimum allocation
+
+    // Add (or update) the reservation allocation
+    if (oldReservation != null) {
+      return plan.updateReservation(capReservation);
+    } else {
+      return plan.addReservation(capReservation);
+    }
+
+  }
+
+  private Map<ReservationInterval, Resource>
+      allocationsToPaddedMap(RLESparseResourceAllocation allocation,
+          long jobArrival, long jobDeadline) {
+
+    // Allocate
+    Map<ReservationInterval, Resource> mapAllocations =
+        allocation.toIntervalMap();
+
+    // Zero allocation
+    Resource zeroResource = Resource.newInstance(0, 0);
+
+    // Pad at the beginning
+    long earliestStart = findEarliestTime(mapAllocations.keySet());
+    if (jobArrival < earliestStart) {
+      mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
+          zeroResource);
+    }
+
+    // Pad at the beginning
+    long latestEnd = findLatestTime(mapAllocations.keySet());
+    if (latestEnd < jobDeadline) {
+      mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
+          zeroResource);
+    }
+
+    return mapAllocations;
+
+  }
+
+  public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
+      ReservationId reservationId, ReservationDefinition reservation)
+      throws PlanningException, ContractValidationException;
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Allocate
+    return allocateUser(reservationId, user, plan, contract, null);
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Get the old allocation
+    ReservationAllocation oldAlloc = plan.getReservationById(reservationId);
+
+    // Allocate (ignores the old allocation)
+    return allocateUser(reservationId, user, plan, contract, oldAlloc);
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    // Delete the existing reservation
+    return plan.deleteReservation(reservationId);
+
+  }
+
+  protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
+
+    long ret = Long.MAX_VALUE;
+    for (ReservationInterval s : sesInt) {
+      if (s.getStartTime() < ret) {
+        ret = s.getStartTime();
+      }
+    }
+    return ret;
+
+  }
+
+  protected static long findLatestTime(Set<ReservationInterval> sesInt) {
+
+    long ret = Long.MIN_VALUE;
+    for (ReservationInterval s : sesInt) {
+      if (s.getEndTime() > ret) {
+        ret = s.getEndTime();
+      }
+    }
+    return ret;
+
+  }
+
+  protected static long stepRoundDown(long t, long step) {
+    return (t / step) * step;
+  }
+
+  protected static long stepRoundUp(long t, long step) {
+    return ((t + step - 1) / step) * step;
+  }
+
+  private ReservationDefinition adjustContract(Plan plan,
+      ReservationDefinition originalContract) {
+
+    // Place here adjustment. For example using QueueMetrics we can track
+    // large container delays per YARN-YARN-1990
+
+    return originalContract;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
new file mode 100644
index 0000000..bdea2f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An entity that seeks to acquire resources to satisfy an user's contract
+ */
+public interface ReservationAgent {
+
+  /**
+   * Create a reservation for the user that abides by the specified contract
+   * 
+   * @param reservationId the identifier of the reservation to be created.
+   * @param user the user who wants to create the reservation
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources the user requires for his
+   *          session
+   * 
+   * @return whether the create operation was successful or not
+   * @throws PlanningException if the session cannot be fitted into the plan
+   */
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException;
+
+  /**
+   * Update a reservation for the user that abides by the specified contract
+   * 
+   * @param reservationId the identifier of the reservation to be updated
+   * @param user the user who wants to create the session
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources the user requires for his
+   *          reservation
+   * 
+   * @return whether the update operation was successful or not
+   * @throws PlanningException if the reservation cannot be fitted into the plan
+   */
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException;
+
+  /**
+   * Delete an user reservation
+   * 
+   * @param reservationId the identifier of the reservation to be deleted
+   * @param user the user who wants to create the reservation
+   * @param plan the Plan to which the session must be fitted
+   * 
+   * @return whether the delete operation was successful or not
+   * @throws PlanningException if the reservation cannot be fitted into the plan
+   */
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
new file mode 100644
index 0000000..7507783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This (re)planner scan a period of time from now to a maximum time window (or
+ * the end of the last session, whichever comes first) checking the overall
+ * capacity is not violated.
+ * 
+ * It greedily removes sessions in reversed order of acceptance (latest accepted
+ * is the first removed).
+ */
+public class SimpleCapacityReplanner implements Planner {
+
+  private static final Log LOG = LogFactory
+      .getLog(SimpleCapacityReplanner.class);
+
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private final Clock clock;
+
+  // this allows to control to time-span of this replanning
+  // far into the future time instants might be worth replanning for
+  // later on
+  private long lengthOfCheckZone;
+
+  public SimpleCapacityReplanner() {
+    this(new UTCClock());
+  }
+
+  @VisibleForTesting
+  SimpleCapacityReplanner(Clock clock) {
+    this.clock = clock;
+  }
+
+  @Override
+  public void init(String planQueueName,
+      ReservationSchedulerConfiguration conf) {
+    this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
+  }
+
+  @Override
+  public void plan(Plan plan, List<ReservationDefinition> contracts)
+      throws PlanningException {
+
+    if (contracts != null) {
+      throw new RuntimeException(
+          "SimpleCapacityReplanner cannot handle new reservation contracts");
+    }
+
+    ResourceCalculator resCalc = plan.getResourceCalculator();
+    Resource totCap = plan.getTotalCapacity();
+    long now = clock.getTime();
+
+    // loop on all moment in time from now to the end of the check Zone
+    // or the end of the planned sessions whichever comes first
+    for (long t = now; 
+         (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); 
+         t += plan.getStep()) {
+      Resource excessCap =
+          Resources.subtract(plan.getTotalCommittedResources(t), totCap);
+      // if we are violating
+      if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
+        // sorted on reverse order of acceptance, so newest reservations first
+        Set<ReservationAllocation> curReservations =
+            new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
+        for (Iterator<ReservationAllocation> resIter =
+            curReservations.iterator(); resIter.hasNext()
+            && Resources.greaterThan(resCalc, totCap, excessCap, 
+                ZERO_RESOURCE);) {
+          ReservationAllocation reservation = resIter.next();
+          plan.deleteReservation(reservation.getReservationId());
+          excessCap =
+              Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
+          LOG.info("Removing reservation " + reservation.getReservationId()
+              + " to repair physical-resource constraints in the plan: "
+              + plan.getQueueName());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
new file mode 100644
index 0000000..9df6b74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * Interface for allocating a single stage in IterativePlanner.
+ */
+public interface StageAllocator {
+
+  /**
+   * Computes the allocation of a stage inside a defined time interval.
+   *
+   * @param plan the Plan to which the reservation must be fitted
+   * @param planLoads a 'dirty' read of the plan loads at each time
+   * @param planModifications the allocations performed by the planning
+   *          algorithm which are not yet reflected by plan
+   * @param rr the stage
+   * @param stageEarliestStart the arrival time (earliest starting time) set for
+   *          the stage by the two phase planning algorithm
+   * @param stageDeadline the deadline of the stage set by the two phase
+   *          planning algorithm
+   *
+   * @return The computed allocation (or null if the stage could not be
+   *         allocated)
+   */
+  Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
new file mode 100644
index 0000000..773fbdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Computes the stage allocation according to the greedy allocation rule. The
+ * greedy rule repeatedly allocates requested containers at the rightmost
+ * (latest) free interval.
+ */
+
+public class StageAllocatorGreedy implements StageAllocator {
+
+  @Override
+  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline) {
+
+    Resource totalCapacity = plan.getTotalCapacity();
+
+    Map<ReservationInterval, Resource> allocationRequests =
+        new HashMap<ReservationInterval, Resource>();
+
+    // compute the gang as a resource and get the duration
+    Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
+    long dur = rr.getDuration();
+    long step = plan.getStep();
+
+    // ceil the duration to the next multiple of the plan step
+    if (dur % step != 0) {
+      dur += (step - (dur % step));
+    }
+
+    // we know for sure that this division has no remainder (part of contract
+    // with user, validate before
+    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+    int maxGang = 0;
+
+    // loop trying to place until we are done, or we are considering
+    // an invalid range of times
+    while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
+
+      // as we run along we remember how many gangs we can fit, and what
+      // was the most constraining moment in time (we will restart just
+      // after that to place the next batch)
+      maxGang = gangsToPlace;
+      long minPoint = stageDeadline;
+      int curMaxGang = maxGang;
+
+      // start placing at deadline (excluded due to [,) interval semantics and
+      // move backward
+      for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
+          && maxGang > 0; t = t - plan.getStep()) {
+
+        // compute net available resources
+        Resource netAvailableRes = Resources.clone(totalCapacity);
+        // Resources.addTo(netAvailableRes, oldResCap);
+        Resources.subtractFrom(netAvailableRes,
+            plan.getTotalCommittedResources(t));
+        Resources.subtractFrom(netAvailableRes,
+            planModifications.getCapacityAtTime(t));
+
+        // compute maximum number of gangs we could fit
+        curMaxGang =
+            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+                totalCapacity, netAvailableRes, gang));
+
+        // pick the minimum between available resources in this instant, and how
+        // many gangs we have to place
+        curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+        // compare with previous max, and set it. also remember *where* we found
+        // the minimum (useful for next attempts)
+        if (curMaxGang <= maxGang) {
+          maxGang = curMaxGang;
+          minPoint = t;
+        }
+      }
+
+      // if we were able to place any gang, record this, and decrement
+      // gangsToPlace
+      if (maxGang > 0) {
+        gangsToPlace -= maxGang;
+
+        ReservationInterval reservationInt =
+            new ReservationInterval(stageDeadline - dur, stageDeadline);
+        Resource reservationRes =
+            Resources.multiply(rr.getCapability(), rr.getConcurrency()
+                * maxGang);
+        // remember occupied space (plan is read-only till we find a plausible
+        // allocation for the entire request). This is needed since we might be
+        // placing other ReservationRequest within the same
+        // ReservationDefinition,
+        // and we must avoid double-counting the available resources
+        planModifications.addInterval(reservationInt, reservationRes);
+        allocationRequests.put(reservationInt, reservationRes);
+
+      }
+
+      // reset our new starting point (curDeadline) to the most constraining
+      // point so far, we will look "left" of that to find more places where
+      // to schedule gangs (for sure nothing on the "right" of this point can
+      // fit a full gang.
+      stageDeadline = minPoint;
+    }
+
+    // if no gangs are left to place we succeed and return the allocation
+    if (gangsToPlace == 0) {
+      return allocationRequests;
+    } else {
+      // If we are here is becasue we did not manage to satisfy this request.
+      // So we need to remove unwanted side-effect from tempAssigned (needed
+      // for ANY).
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation
+          : allocationRequests.entrySet()) {
+        planModifications.removeInterval(tempAllocation.getKey(),
+            tempAllocation.getValue());
+      }
+      // and return null to signal failure in this allocation
+      return null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
new file mode 100644
index 0000000..4b5763d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A stage allocator that iteratively allocates containers in the
+ * {@link DurationInterval} with lowest overall cost. The algorithm only
+ * considers intervals of the form: [stageDeadline - (n+1)*duration,
+ * stageDeadline - n*duration) for an integer n. This guarantees that the
+ * allocations are aligned (as opposed to overlapping duration intervals).
+ *
+ * The smoothnessFactor parameter controls the number of containers that are
+ * simultaneously allocated in each iteration of the algorithm.
+ */
+
+public class StageAllocatorLowCostAligned implements StageAllocator {
+
+  // Smoothness factor
+  private int smoothnessFactor = 10;
+
+  // Constructor
+  public StageAllocatorLowCostAligned() {
+  }
+
+  // Constructor
+  public StageAllocatorLowCostAligned(int smoothnessFactor) {
+    this.smoothnessFactor = smoothnessFactor;
+  }
+
+  // computeJobAllocation()
+  @Override
+  public Map<ReservationInterval, Resource> computeStageAllocation(
+      Plan plan, Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline) {
+
+    // Initialize
+    ResourceCalculator resCalc = plan.getResourceCalculator();
+    Resource capacity = plan.getTotalCapacity();
+    long step = plan.getStep();
+
+    // Create allocationRequestsearlies
+    RLESparseResourceAllocation allocationRequests =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+    // Initialize parameters
+    long duration = stepRoundUp(rr.getDuration(), step);
+    int windowSizeInDurations =
+        (int) ((stageDeadline - stageEarliestStart) / duration);
+    int totalGangs = rr.getNumContainers() / rr.getConcurrency();
+    int numContainersPerGang = rr.getConcurrency();
+    Resource gang =
+        Resources.multiply(rr.getCapability(), numContainersPerGang);
+
+    // Set maxGangsPerUnit
+    int maxGangsPerUnit =
+        (int) Math.max(
+            Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
+    maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
+
+    // If window size is too small, return null
+    if (windowSizeInDurations <= 0) {
+      return null;
+    }
+
+    // Initialize tree sorted by costs
+    TreeSet<DurationInterval> durationIntervalsSortedByCost =
+        new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
+          @Override
+          public int compare(DurationInterval val1, DurationInterval val2) {
+
+            int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
+            if (cmp != 0) {
+              return cmp;
+            }
+
+            return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
+          }
+        });
+
+    // Add durationIntervals that end at (endTime - n*duration) for some n.
+    for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+        + duration; intervalEnd -= duration) {
+
+      long intervalStart = intervalEnd - duration;
+
+      // Get duration interval [intervalStart,intervalEnd)
+      DurationInterval durationInterval =
+          getDurationInterval(intervalStart, intervalEnd, planLoads,
+              planModifications, capacity, resCalc, step);
+
+      // If the interval can fit a gang, add it to the tree
+      if (durationInterval.canAllocate(gang, capacity, resCalc)) {
+        durationIntervalsSortedByCost.add(durationInterval);
+      }
+    }
+
+    // Allocate
+    int remainingGangs = totalGangs;
+    while (remainingGangs > 0) {
+
+      // If no durationInterval can fit a gang, break and return null
+      if (durationIntervalsSortedByCost.isEmpty()) {
+        break;
+      }
+
+      // Get best duration interval
+      DurationInterval bestDurationInterval =
+          durationIntervalsSortedByCost.first();
+      int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
+
+      // Add it
+      remainingGangs -= numGangsToAllocate;
+
+      ReservationInterval reservationInt =
+          new ReservationInterval(bestDurationInterval.getStartTime(),
+              bestDurationInterval.getEndTime());
+
+      Resource reservationRes =
+          Resources.multiply(rr.getCapability(), rr.getConcurrency()
+              * numGangsToAllocate);
+
+      planModifications.addInterval(reservationInt, reservationRes);
+      allocationRequests.addInterval(reservationInt, reservationRes);
+
+      // Remove from tree
+      durationIntervalsSortedByCost.remove(bestDurationInterval);
+
+      // Get updated interval
+      DurationInterval updatedDurationInterval =
+          getDurationInterval(bestDurationInterval.getStartTime(),
+              bestDurationInterval.getStartTime() + duration, planLoads,
+              planModifications, capacity, resCalc, step);
+
+      // Add to tree, if possible
+      if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
+        durationIntervalsSortedByCost.add(updatedDurationInterval);
+      }
+
+    }
+
+    // Get the final allocation
+    Map<ReservationInterval, Resource> allocations =
+        allocationRequests.toIntervalMap();
+
+    // If no gangs are left to place we succeed and return the allocation
+    if (remainingGangs <= 0) {
+      return allocations;
+    } else {
+
+      // If we are here is because we did not manage to satisfy this request.
+      // We remove unwanted side-effect from planModifications (needed for ANY).
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation
+          : allocations.entrySet()) {
+
+        planModifications.removeInterval(tempAllocation.getKey(),
+            tempAllocation.getValue());
+
+      }
+      // Return null to signal failure in this allocation
+      return null;
+
+    }
+
+  }
+
+  protected DurationInterval getDurationInterval(long startTime, long endTime,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc, long step) {
+
+    // Initialize the dominant loads structure
+    Resource dominantResources = Resource.newInstance(0, 0);
+
+    // Calculate totalCost and maxLoad
+    double totalCost = 0.0;
+    for (long t = startTime; t < endTime; t += step) {
+
+      // Get the load
+      Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+      // Increase the total cost
+      totalCost += calcCostOfLoad(load, capacity, resCalc);
+
+      // Update the dominant resources
+      dominantResources = Resources.componentwiseMax(dominantResources, load);
+
+    }
+
+    // Return the corresponding durationInterval
+    return new DurationInterval(startTime, endTime, totalCost,
+        dominantResources);
+
+  }
+
+  protected double calcCostOfInterval(long startTime, long endTime,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc, long step) {
+
+    // Sum costs in the interval [startTime,endTime)
+    double totalCost = 0.0;
+    for (long t = startTime; t < endTime; t += step) {
+      totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
+          resCalc);
+    }
+
+    // Return sum
+    return totalCost;
+
+  }
+
+  protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc) {
+
+    // Get the current load at time t
+    Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+    // Return cost
+    return calcCostOfLoad(load, capacity, resCalc);
+
+  }
+
+  protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications) {
+
+    Resource planLoad = planLoads.get(t);
+    planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
+
+    return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
+
+  }
+
+  protected double calcCostOfLoad(Resource load, Resource capacity,
+      ResourceCalculator resCalc) {
+
+    return resCalc.ratio(load, capacity);
+
+  }
+
+  protected static long stepRoundDown(long t, long step) {
+    return (t / step) * step;
+  }
+
+  protected static long stepRoundUp(long t, long step) {
+    return ((t + step - 1) / step) * step;
+  }
+
+  /**
+   * An inner class that represents an interval, typically of length duration.
+   * The class holds the total cost of the interval and the maximal load inside
+   * the interval in each dimension (both calculated externally).
+   */
+  protected static class DurationInterval {
+
+    private long startTime;
+    private long endTime;
+    private double cost;
+    private Resource maxLoad;
+
+    // Constructor
+    public DurationInterval(long startTime, long endTime, double cost,
+        Resource maxLoad) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.cost = cost;
+      this.maxLoad = maxLoad;
+    }
+
+    // canAllocate() - boolean function, returns whether requestedResources
+    // can be allocated during the durationInterval without
+    // violating capacity constraints
+    public boolean canAllocate(Resource requestedResources, Resource capacity,
+        ResourceCalculator resCalc) {
+
+      Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
+      return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
+
+    }
+
+    // numCanFit() - returns the maximal number of requestedResources can be
+    // allocated during the durationInterval without violating
+    // capacity constraints
+    public int numCanFit(Resource requestedResources, Resource capacity,
+        ResourceCalculator resCalc) {
+
+      // Represents the largest resource demand that can be satisfied throughout
+      // the entire DurationInterval (i.e., during [startTime,endTime))
+      Resource availableResources = Resources.subtract(capacity, maxLoad);
+
+      // Maximal number of requestedResources that fit inside the interval
+      return (int) Math.floor(Resources.divide(resCalc, capacity,
+          availableResources, requestedResources));
+
+    }
+
+    public long getStartTime() {
+      return this.startTime;
+    }
+
+    public void setStartTime(long value) {
+      this.startTime = value;
+    }
+
+    public long getEndTime() {
+      return this.endTime;
+    }
+
+    public void setEndTime(long value) {
+      this.endTime = value;
+    }
+
+    public Resource getMaxLoad() {
+      return this.maxLoad;
+    }
+
+    public void setMaxLoad(Resource value) {
+      this.maxLoad = value;
+    }
+
+    public double getTotalCost() {
+      return this.cost;
+    }
+
+    public void setTotalCost(double value) {
+      this.cost = value;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
new file mode 100644
index 0000000..547616a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Interface for setting the earliest start time of a stage in IterativePlanner.
+ */
+public interface StageEarliestStart {
+
+  /**
+   * Computes the earliest allowed starting time for a given stage.
+   *
+   * @param plan the Plan to which the reservation must be fitted
+   * @param reservation the job contract
+   * @param index the index of the stage in the job contract
+   * @param currentReservationStage the stage
+   * @param stageDeadline the deadline of the stage set by the two phase
+   *          planning algorithm
+   *
+   * @return the earliest allowed starting time for the stage.
+   */
+  long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
+          int index, ReservationRequest currentReservationStage,
+          long stageDeadline);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
new file mode 100644
index 0000000..5a46a4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.ListIterator;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage proportional to the job weight. The
+ * interval [jobArrival, stageDeadline) is divided as follows. First, each stage
+ * is guaranteed at least its requested duration. Then, the stage receives a
+ * fraction of the remaining time. The fraction is calculated as the ratio
+ * between the weight (total requested resources) of the stage and the total
+ * weight of all proceeding stages.
+ */
+
+public class StageEarliestStartByDemand implements StageEarliestStart {
+
+  private long step;
+
+  @Override
+  public long setEarliestStartTime(Plan plan,
+      ReservationDefinition reservation, int index, ReservationRequest current,
+      long stageDeadline) {
+
+    step = plan.getStep();
+
+    // If this is the first stage, don't bother with the computation.
+    if (index < 1) {
+      return reservation.getArrival();
+    }
+
+    // Get iterator
+    ListIterator<ReservationRequest> li =
+        reservation.getReservationRequests().getReservationResources()
+            .listIterator(index);
+    ReservationRequest rr;
+
+    // Calculate the total weight & total duration
+    double totalWeight = calcWeight(current);
+    long totalDuration = getRoundedDuration(current, plan);
+
+    while (li.hasPrevious()) {
+      rr = li.previous();
+      totalWeight += calcWeight(rr);
+      totalDuration += getRoundedDuration(rr, plan);
+    }
+
+    // Compute the weight of the current stage as compared to remaining ones
+    double ratio = calcWeight(current) / totalWeight;
+
+    // Estimate an early start time, such that:
+    // 1. Every stage is guaranteed to receive at least its duration
+    // 2. The remainder of the window is divided between stages
+    // proportionally to its workload (total memory consumption)
+    long window = stageDeadline - reservation.getArrival();
+    long windowRemainder = window - totalDuration;
+    long earlyStart =
+        (long) (stageDeadline - getRoundedDuration(current, plan)
+            - (windowRemainder * ratio));
+
+    // Realign if necessary (since we did some arithmetic)
+    earlyStart = stepRoundUp(earlyStart, step);
+
+    // Return
+    return earlyStart;
+
+  }
+
+  // Weight = total memory consumption of stage
+  protected double calcWeight(ReservationRequest stage) {
+    return (stage.getDuration() * stage.getCapability().getMemory())
+        * (stage.getNumContainers());
+  }
+
+  protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
+    return stepRoundUp(stage.getDuration(), step);
+  }
+
+  protected static long stepRoundDown(long t, long step) {
+    return (t / step) * step;
+  }
+
+  protected static long stepRoundUp(long t, long step) {
+    return ((t + step - 1) / step) * step;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
new file mode 100644
index 0000000..8347816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage as the job arrival time.
+ */
+public class StageEarliestStartByJobArrival implements StageEarliestStart {
+
+  @Override
+  public long setEarliestStartTime(Plan plan,
+      ReservationDefinition reservation, int index, ReservationRequest current,
+      long stageDeadline) {
+
+    return reservation.getArrival();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
new file mode 100644
index 0000000..1d37ce5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * A planning algorithm that invokes several other planning algorithms according
+ * to a given order. If one of the planners succeeds, the allocation it
+ * generates is returned.
+ */
+public class TryManyReservationAgents implements ReservationAgent {
+
+  // Planning algorithms
+  private final List<ReservationAgent> algs;
+
+  // Constructor
+  public TryManyReservationAgents(List<ReservationAgent> algs) {
+    this.algs = new LinkedList<ReservationAgent>(algs);
+  }
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Save the planning exception
+    PlanningException planningException = null;
+
+    // Try all of the algorithms, in order
+    for (ReservationAgent alg : algs) {
+
+      try {
+        if (alg.createReservation(reservationId, user, plan, contract)) {
+          return true;
+        }
+      } catch (PlanningException e) {
+        planningException = e;
+      }
+
+    }
+
+    // If all of the algorithms failed and one of the algorithms threw an
+    // exception, throw the last planning exception
+    if (planningException != null) {
+      throw planningException;
+    }
+
+    // If all of the algorithms failed, return false
+    return false;
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Save the planning exception
+    PlanningException planningException = null;
+
+    // Try all of the algorithms, in order
+    for (ReservationAgent alg : algs) {
+
+      try {
+        if (alg.updateReservation(reservationId, user, plan, contract)) {
+          return true;
+        }
+      } catch (PlanningException e) {
+        planningException = e;
+      }
+
+    }
+
+    // If all of the algorithms failed and one of the algorithms threw an
+    // exception, throw the last planning exception
+    if (planningException != null) {
+      throw planningException;
+    }
+
+    // If all of the algorithms failed, return false
+    return false;
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    return plan.deleteReservation(reservationId);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index be1d69a..adb9dcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -89,7 +90,7 @@ public class ReservationSystemTestUtil {
     Assert.assertEquals(planQName, plan.getQueueName());
     Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
     Assert.assertTrue(
-        plan.getReservationAgent() instanceof GreedyReservationAgent);
+        plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
     Assert.assertTrue(
         plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
   }
@@ -102,7 +103,7 @@ public class ReservationSystemTestUtil {
     Assert.assertEquals(newQ, newPlan.getQueueName());
     Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
     Assert
-        .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+        .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
     Assert
         .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 19f876d..f608c3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index b8663f6..15f9a89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
index f294eaf..4b685b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
index e9a4f50..43316f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;