You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/25 19:36:35 UTC
[28/29] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/156f24ea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/156f24ea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/156f24ea
Branch: refs/heads/HADOOP-12111
Commit: 156f24ead00436faad5d4aeef327a546392cd265
Parents: adcf5dd
Author: ccurino <cc...@ubuntu.gateway.2wire.net>
Authored: Sat Jul 25 07:39:47 2015 -0700
Committer: ccurino <cc...@ubuntu.gateway.2wire.net>
Committed: Sat Jul 25 07:39:47 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reservation/AbstractReservationSystem.java | 2 +
.../reservation/GreedyReservationAgent.java | 390 ---------
.../reservation/InMemoryPlan.java | 13 +-
.../InMemoryReservationAllocation.java | 8 +-
.../resourcemanager/reservation/Plan.java | 1 +
.../reservation/PlanContext.java | 2 +
.../resourcemanager/reservation/PlanView.java | 31 +-
.../resourcemanager/reservation/Planner.java | 47 --
.../RLESparseResourceAllocation.java | 55 +-
.../reservation/ReservationAgent.java | 72 --
.../ReservationSchedulerConfiguration.java | 6 +-
.../reservation/ReservationSystem.java | 5 +-
.../reservation/ReservationSystemUtil.java | 6 +-
.../reservation/SimpleCapacityReplanner.java | 113 ---
.../planning/AlignedPlannerWithGreedy.java | 123 +++
.../planning/GreedyReservationAgent.java | 97 +++
.../reservation/planning/IterativePlanner.java | 338 ++++++++
.../reservation/planning/Planner.java | 49 ++
.../reservation/planning/PlanningAlgorithm.java | 207 +++++
.../reservation/planning/ReservationAgent.java | 73 ++
.../planning/SimpleCapacityReplanner.java | 118 +++
.../reservation/planning/StageAllocator.java | 55 ++
.../planning/StageAllocatorGreedy.java | 152 ++++
.../planning/StageAllocatorLowCostAligned.java | 360 ++++++++
.../planning/StageEarliestStart.java | 46 ++
.../planning/StageEarliestStartByDemand.java | 106 +++
.../StageEarliestStartByJobArrival.java | 39 +
.../planning/TryManyReservationAgents.java | 114 +++
.../reservation/ReservationSystemTestUtil.java | 5 +-
.../reservation/TestCapacityOverTimePolicy.java | 2 +-
.../TestCapacitySchedulerPlanFollower.java | 1 +
.../reservation/TestFairReservationSystem.java | 1 -
.../TestFairSchedulerPlanFollower.java | 1 +
.../reservation/TestGreedyReservationAgent.java | 604 --------------
.../reservation/TestInMemoryPlan.java | 2 +
.../reservation/TestNoOverCommitPolicy.java | 1 +
.../TestRLESparseResourceAllocation.java | 51 +-
.../TestSchedulerPlanFollowerBase.java | 1 +
.../TestSimpleCapacityReplanner.java | 162 ----
.../planning/TestAlignedPlanner.java | 820 +++++++++++++++++++
.../planning/TestGreedyReservationAgent.java | 611 ++++++++++++++
.../planning/TestSimpleCapacityReplanner.java | 170 ++++
43 files changed, 3634 insertions(+), 1429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 55258a6..883d009 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -147,6 +147,9 @@ Release 2.8.0 - UNRELEASED
YARN-2019. Retrospect on decision of making RM crashed if any exception throw
in ZKRMStateStore. (Jian He via junping_du)
+ YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations.
+ (Jonathan Yaniv and Ishai Menache via curino)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 8a15ac6..d2603c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
deleted file mode 100644
index 214df1c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This Agent employs a simple greedy placement strategy, placing the various
- * stages of a {@link ReservationRequest} from the deadline moving backward
- * towards the arrival. This allows jobs with earlier deadline to be scheduled
- * greedily as well. Combined with an opportunistic anticipation of work if the
- * cluster is not fully utilized also seems to provide good latency for
- * best-effort jobs (i.e., jobs running without a reservation).
- *
- * This agent does not account for locality and only consider container
- * granularity for validation purposes (i.e., you can't exceed max-container
- * size).
- */
-public class GreedyReservationAgent implements ReservationAgent {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(GreedyReservationAgent.class);
-
- @Override
- public boolean createReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException {
- return computeAllocation(reservationId, user, plan, contract, null);
- }
-
- @Override
- public boolean updateReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException {
- return computeAllocation(reservationId, user, plan, contract,
- plan.getReservationById(reservationId));
- }
-
- @Override
- public boolean deleteReservation(ReservationId reservationId, String user,
- Plan plan) throws PlanningException {
- return plan.deleteReservation(reservationId);
- }
-
- private boolean computeAllocation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract,
- ReservationAllocation oldReservation) throws PlanningException,
- ContractValidationException {
- LOG.info("placing the following ReservationRequest: " + contract);
-
- Resource totalCapacity = plan.getTotalCapacity();
-
- // Here we can addd logic to adjust the ResourceDefinition to account for
- // system "imperfections" (e.g., scheduling delays for large containers).
-
- // Align with plan step conservatively (i.e., ceil arrival, and floor
- // deadline)
- long earliestStart = contract.getArrival();
- long step = plan.getStep();
- if (earliestStart % step != 0) {
- earliestStart = earliestStart + (step - (earliestStart % step));
- }
- long deadline =
- contract.getDeadline() - contract.getDeadline() % plan.getStep();
-
- // setup temporary variables to handle time-relations between stages and
- // intermediate answers
- long curDeadline = deadline;
- long oldDeadline = -1;
-
- Map<ReservationInterval, Resource> allocations =
- new HashMap<ReservationInterval, Resource>();
- RLESparseResourceAllocation tempAssigned =
- new RLESparseResourceAllocation(plan.getResourceCalculator(),
- plan.getMinimumAllocation());
-
- List<ReservationRequest> stages = contract.getReservationRequests()
- .getReservationResources();
- ReservationRequestInterpreter type = contract.getReservationRequests()
- .getInterpreter();
-
- boolean hasGang = false;
-
- // Iterate the stages in backward from deadline
- for (ListIterator<ReservationRequest> li =
- stages.listIterator(stages.size()); li.hasPrevious();) {
-
- ReservationRequest currentReservationStage = li.previous();
-
- // validate the RR respect basic constraints
- validateInput(plan, currentReservationStage, totalCapacity);
-
- hasGang |= currentReservationStage.getConcurrency() > 1;
-
- // run allocation for a single stage
- Map<ReservationInterval, Resource> curAlloc =
- placeSingleStage(plan, tempAssigned, currentReservationStage,
- earliestStart, curDeadline, oldReservation, totalCapacity);
-
- if (curAlloc == null) {
- // if we did not find an allocation for the currentReservationStage
- // return null, unless the ReservationDefinition we are placing is of
- // type ANY
- if (type != ReservationRequestInterpreter.R_ANY) {
- throw new PlanningException("The GreedyAgent"
- + " couldn't find a valid allocation for your request");
- } else {
- continue;
- }
- } else {
-
- // if we did find an allocation add it to the set of allocations
- allocations.putAll(curAlloc);
-
- // if this request is of type ANY we are done searching (greedy)
- // and can return the current allocation (break-out of the search)
- if (type == ReservationRequestInterpreter.R_ANY) {
- break;
- }
-
- // if the request is of ORDER or ORDER_NO_GAP we constraint the next
- // round of allocation to precede the current allocation, by setting
- // curDeadline
- if (type == ReservationRequestInterpreter.R_ORDER
- || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
- curDeadline = findEarliestTime(curAlloc.keySet());
-
- // for ORDER_NO_GAP verify that the allocation found so far has no
- // gap, return null otherwise (the greedy procedure failed to find a
- // no-gap
- // allocation)
- if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
- && oldDeadline > 0) {
- if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
- .getStep()) {
- throw new PlanningException("The GreedyAgent"
- + " couldn't find a valid allocation for your request");
- }
- }
- // keep the variable oldDeadline pointing to the last deadline we
- // found
- oldDeadline = curDeadline;
- }
- }
- }
-
- // / If we got here is because we failed to find an allocation for the
- // ReservationDefinition give-up and report failure to the user
- if (allocations.isEmpty()) {
- throw new PlanningException("The GreedyAgent"
- + " couldn't find a valid allocation for your request");
- }
-
- // create reservation with above allocations if not null/empty
-
- Resource ZERO_RES = Resource.newInstance(0, 0);
-
- long firstStartTime = findEarliestTime(allocations.keySet());
-
- // add zero-padding from arrival up to the first non-null allocation
- // to guarantee that the reservation exists starting at arrival
- if (firstStartTime > earliestStart) {
- allocations.put(new ReservationInterval(earliestStart,
- firstStartTime), ZERO_RES);
- firstStartTime = earliestStart;
- // consider to add trailing zeros at the end for simmetry
- }
-
- // Actually add/update the reservation in the plan.
- // This is subject to validation as other agents might be placing
- // in parallel and there might be sharing policies the agent is not
- // aware off.
- ReservationAllocation capReservation =
- new InMemoryReservationAllocation(reservationId, contract, user,
- plan.getQueueName(), firstStartTime,
- findLatestTime(allocations.keySet()), allocations,
- plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
- if (oldReservation != null) {
- return plan.updateReservation(capReservation);
- } else {
- return plan.addReservation(capReservation);
- }
- }
-
- private void validateInput(Plan plan, ReservationRequest rr,
- Resource totalCapacity) throws ContractValidationException {
-
- if (rr.getConcurrency() < 1) {
- throw new ContractValidationException("Gang Size should be >= 1");
- }
-
- if (rr.getNumContainers() <= 0) {
- throw new ContractValidationException("Num containers should be >= 0");
- }
-
- // check that gangSize and numContainers are compatible
- if (rr.getNumContainers() % rr.getConcurrency() != 0) {
- throw new ContractValidationException(
- "Parallelism must be an exact multiple of gang size");
- }
-
- // check that the largest container request does not exceed
- // the cluster-wide limit for container sizes
- if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
- rr.getCapability(), plan.getMaximumAllocation())) {
- throw new ContractValidationException("Individual"
- + " capability requests should not exceed cluster's maxAlloc");
- }
- }
-
- /**
- * This method actually perform the placement of an atomic stage of the
- * reservation. The key idea is to traverse the plan backward for a
- * "lease-duration" worth of time, and compute what is the maximum multiple of
- * our concurrency (gang) parameter we can fit. We do this and move towards
- * previous instant in time until the time-window is exhausted or we placed
- * all the user request.
- */
- private Map<ReservationInterval, Resource> placeSingleStage(
- Plan plan, RLESparseResourceAllocation tempAssigned,
- ReservationRequest rr, long earliestStart, long curDeadline,
- ReservationAllocation oldResAllocation, final Resource totalCapacity) {
-
- Map<ReservationInterval, Resource> allocationRequests =
- new HashMap<ReservationInterval, Resource>();
-
- // compute the gang as a resource and get the duration
- Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
- long dur = rr.getDuration();
- long step = plan.getStep();
-
- // ceil the duration to the next multiple of the plan step
- if (dur % step != 0) {
- dur += (step - (dur % step));
- }
-
- // we know for sure that this division has no remainder (part of contract
- // with user, validate before
- int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
-
- int maxGang = 0;
-
- // loop trying to place until we are done, or we are considering
- // an invalid range of times
- while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
-
- // as we run along we remember how many gangs we can fit, and what
- // was the most constraining moment in time (we will restart just
- // after that to place the next batch)
- maxGang = gangsToPlace;
- long minPoint = curDeadline;
- int curMaxGang = maxGang;
-
- // start placing at deadline (excluded due to [,) interval semantics and
- // move backward
- for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
- && maxGang > 0; t = t - plan.getStep()) {
-
- // As we run along we will logically remove the previous allocation for
- // this reservation
- // if one existed
- Resource oldResCap = Resource.newInstance(0, 0);
- if (oldResAllocation != null) {
- oldResCap = oldResAllocation.getResourcesAtTime(t);
- }
-
- // compute net available resources
- Resource netAvailableRes = Resources.clone(totalCapacity);
- Resources.addTo(netAvailableRes, oldResCap);
- Resources.subtractFrom(netAvailableRes,
- plan.getTotalCommittedResources(t));
- Resources.subtractFrom(netAvailableRes,
- tempAssigned.getCapacityAtTime(t));
-
- // compute maximum number of gangs we could fit
- curMaxGang =
- (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
- totalCapacity, netAvailableRes, gang));
-
- // pick the minimum between available resources in this instant, and how
- // many gangs we have to place
- curMaxGang = Math.min(gangsToPlace, curMaxGang);
-
- // compare with previous max, and set it. also remember *where* we found
- // the minimum (useful for next attempts)
- if (curMaxGang <= maxGang) {
- maxGang = curMaxGang;
- minPoint = t;
- }
- }
-
- // if we were able to place any gang, record this, and decrement
- // gangsToPlace
- if (maxGang > 0) {
- gangsToPlace -= maxGang;
-
- ReservationInterval reservationInt =
- new ReservationInterval(curDeadline - dur, curDeadline);
- ReservationRequest reservationRequest =
- ReservationRequest.newInstance(rr.getCapability(),
- rr.getConcurrency() * maxGang, rr.getConcurrency(),
- rr.getDuration());
- // remember occupied space (plan is read-only till we find a plausible
- // allocation for the entire request). This is needed since we might be
- // placing other ReservationRequest within the same
- // ReservationDefinition,
- // and we must avoid double-counting the available resources
- final Resource reservationRes = ReservationSystemUtil.toResource(
- reservationRequest);
- tempAssigned.addInterval(reservationInt, reservationRes);
- allocationRequests.put(reservationInt, reservationRes);
-
- }
-
- // reset our new starting point (curDeadline) to the most constraining
- // point so far, we will look "left" of that to find more places where
- // to schedule gangs (for sure nothing on the "right" of this point can
- // fit a full gang.
- curDeadline = minPoint;
- }
-
- // if no gangs are left to place we succeed and return the allocation
- if (gangsToPlace == 0) {
- return allocationRequests;
- } else {
- // If we are here is becasue we did not manage to satisfy this request.
- // So we need to remove unwanted side-effect from tempAssigned (needed
- // for ANY).
- for (Map.Entry<ReservationInterval, Resource> tempAllocation :
- allocationRequests.entrySet()) {
- tempAssigned.removeInterval(tempAllocation.getKey(),
- tempAllocation.getValue());
- }
- // and return null to signal failure in this allocation
- return null;
- }
- }
-
- // finds the leftmost point of this set of ReservationInterval
- private long findEarliestTime(Set<ReservationInterval> resInt) {
- long ret = Long.MAX_VALUE;
- for (ReservationInterval s : resInt) {
- if (s.getStartTime() < ret) {
- ret = s.getStartTime();
- }
- }
- return ret;
- }
-
- // finds the rightmost point of this set of ReservationIntervals
- private long findLatestTime(Set<ReservationInterval> resInt) {
- long ret = Long.MIN_VALUE;
- for (ReservationInterval s : resInt) {
- if (s.getEndTime() > ret) {
- ret = s.getEndTime();
- }
- }
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 50d66cf..abc9c98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
@@ -41,7 +43,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class InMemoryPlan implements Plan {
+/**
+ * This class represents an in memory representation of the state of our
+ * reservation system, and provides accelerated access to both individual
+ * reservations and aggregate utilization of resources over time.
+ */
+public class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
@@ -75,7 +82,7 @@ class InMemoryPlan implements Plan {
private Resource totalCapacity;
- InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry) {
@@ -83,7 +90,7 @@ class InMemoryPlan implements Plan {
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
}
- InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index a4dd23b..42a2243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
/**
* An in memory implementation of a reservation allocation using the
* {@link RLESparseResourceAllocation}
- *
+ *
*/
-class InMemoryReservationAllocation implements ReservationAllocation {
+public class InMemoryReservationAllocation implements ReservationAllocation {
private final String planName;
private final ReservationId reservationID;
@@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
private RLESparseResourceAllocation resourcesOverTime;
- InMemoryReservationAllocation(ReservationId reservationID,
+ public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, Resource> allocations,
@@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
allocations, calculator, minAlloc, false);
}
- InMemoryReservationAllocation(ReservationId reservationID,
+ public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, Resource> allocations,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
index e8e9e29..f7ffbd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* A Plan represents the central data structure of a reservation system that
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
index 6d3506d..94e299e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index b49e99e..be68906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -1,26 +1,27 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* This interface provides a read-only view on the allocations made in this
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
deleted file mode 100644
index 57f28ff..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-public interface Planner {
-
- /**
- * Update the existing {@link Plan}, by adding/removing/updating existing
- * reservations, and adding a subset of the reservation requests in the
- * contracts parameter.
- *
- * @param plan the {@link Plan} to replan
- * @param contracts the list of reservation requests
- * @throws PlanningException
- */
- public void plan(Plan plan, List<ReservationDefinition> contracts)
- throws PlanningException;
-
- /**
- * Initialize the replanner
- *
- * @param planQueueName the name of the queue for this plan
- * @param conf the scheduler configuration
- */
- void init(String planQueueName, ReservationSchedulerConfiguration conf);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 2957cc6..80f2ff7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -38,7 +38,7 @@ import com.google.gson.stream.JsonWriter;
/**
* This is a run length encoded sparse data structure that maintains resource
- * allocations over time
+ * allocations over time.
*/
public class RLESparseResourceAllocation {
@@ -74,7 +74,7 @@ public class RLESparseResourceAllocation {
/**
* Add a resource for the specified interval
- *
+ *
* @param reservationInterval the interval for which the resource is to be
* added
* @param totCap the resource to be added
@@ -138,7 +138,7 @@ public class RLESparseResourceAllocation {
/**
* Removes a resource for the specified interval
- *
+ *
* @param reservationInterval the interval for which the resource is to be
* removed
* @param totCap the resource to be removed
@@ -189,7 +189,7 @@ public class RLESparseResourceAllocation {
/**
* Returns the capacity, i.e. total resources allocated at the specified point
* of time
- *
+ *
* @param tick the time (UTC in ms) at which the capacity is requested
* @return the resources allocated at the specified time
*/
@@ -208,7 +208,7 @@ public class RLESparseResourceAllocation {
/**
* Get the timestamp of the earliest resource allocation
- *
+ *
* @return the timestamp of the first resource allocation
*/
public long getEarliestStartTime() {
@@ -226,7 +226,7 @@ public class RLESparseResourceAllocation {
/**
* Get the timestamp of the latest resource allocation
- *
+ *
* @return the timestamp of the last resource allocation
*/
public long getLatestEndTime() {
@@ -244,7 +244,7 @@ public class RLESparseResourceAllocation {
/**
* Returns true if there are no non-zero entries
- *
+ *
* @return true if there are no allocations or false otherwise
*/
public boolean isEmpty() {
@@ -287,7 +287,7 @@ public class RLESparseResourceAllocation {
/**
* Returns the JSON string representation of the current resources allocated
* over time
- *
+ *
* @return the JSON string representation of the current resources allocated
* over time
*/
@@ -312,4 +312,43 @@ public class RLESparseResourceAllocation {
}
}
+ /**
+ * Returns the representation of the current resources allocated over time as
+ * an interval map.
+ *
+ * @return the representation of the current resources allocated over time as
+ * an interval map.
+ */
+ public Map<ReservationInterval, Resource> toIntervalMap() {
+
+ readLock.lock();
+ try {
+ Map<ReservationInterval, Resource> allocations =
+ new TreeMap<ReservationInterval, Resource>();
+
+ // Empty
+ if (isEmpty()) {
+ return allocations;
+ }
+
+ Map.Entry<Long, Resource> lastEntry = null;
+ for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+
+ if (lastEntry != null) {
+ ReservationInterval interval =
+ new ReservationInterval(lastEntry.getKey(), entry.getKey());
+ Resource resource = lastEntry.getValue();
+
+ allocations.put(interval, resource);
+ }
+
+ lastEntry = entry;
+ }
+ return allocations;
+ } finally {
+ readLock.unlock();
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
deleted file mode 100644
index 6955036..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-/**
- * An entity that seeks to acquire resources to satisfy an user's contract
- */
-public interface ReservationAgent {
-
- /**
- * Create a reservation for the user that abides by the specified contract
- *
- * @param reservationId the identifier of the reservation to be created.
- * @param user the user who wants to create the reservation
- * @param plan the Plan to which the reservation must be fitted
- * @param contract encapsulates the resources the user requires for his
- * session
- *
- * @return whether the create operation was successful or not
- * @throws PlanningException if the session cannot be fitted into the plan
- */
- public boolean createReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException;
-
- /**
- * Update a reservation for the user that abides by the specified contract
- *
- * @param reservationId the identifier of the reservation to be updated
- * @param user the user who wants to create the session
- * @param plan the Plan to which the reservation must be fitted
- * @param contract encapsulates the resources the user requires for his
- * reservation
- *
- * @return whether the update operation was successful or not
- * @throws PlanningException if the reservation cannot be fitted into the plan
- */
- public boolean updateReservation(ReservationId reservationId, String user,
- Plan plan, ReservationDefinition contract) throws PlanningException;
-
- /**
- * Delete an user reservation
- *
- * @param reservationId the identifier of the reservation to be deleted
- * @param user the user who wants to create the reservation
- * @param plan the Plan to which the session must be fitted
- *
- * @return whether the delete operation was successful or not
- * @throws PlanningException if the reservation cannot be fitted into the plan
- */
- public boolean deleteReservation(ReservationId reservationId, String user,
- Plan plan) throws PlanningException;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
index 2af1ffd..c430b1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
public abstract class ReservationSchedulerConfiguration extends Configuration {
@@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_AGENT_NAME =
- "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy";
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
- "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner";
@InterfaceAudience.Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index cb76dcf..3309693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -24,12 +24,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* This interface is the one implemented by any system that wants to support
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
index 8affae4..5562adc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap;
import java.util.Map;
-final class ReservationSystemUtil {
+/**
+ * Simple helper class for static methods used to transform across
+ * common formats in tests
+ */
+public final class ReservationSystemUtil {
private ReservationSystemUtil() {
// not called
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
deleted file mode 100644
index b5a6a99..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.UTCClock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This (re)planner scan a period of time from now to a maximum time window (or
- * the end of the last session, whichever comes first) checking the overall
- * capacity is not violated.
- *
- * It greedily removes sessions in reversed order of acceptance (latest accepted
- * is the first removed).
- */
-public class SimpleCapacityReplanner implements Planner {
-
- private static final Log LOG = LogFactory
- .getLog(SimpleCapacityReplanner.class);
-
- private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
-
- private final Clock clock;
-
- // this allows to control to time-span of this replanning
- // far into the future time instants might be worth replanning for
- // later on
- private long lengthOfCheckZone;
-
- public SimpleCapacityReplanner() {
- this(new UTCClock());
- }
-
- @VisibleForTesting
- SimpleCapacityReplanner(Clock clock) {
- this.clock = clock;
- }
-
- @Override
- public void init(String planQueueName,
- ReservationSchedulerConfiguration conf) {
- this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
- }
-
- @Override
- public void plan(Plan plan, List<ReservationDefinition> contracts)
- throws PlanningException {
-
- if (contracts != null) {
- throw new RuntimeException(
- "SimpleCapacityReplanner cannot handle new reservation contracts");
- }
-
- ResourceCalculator resCalc = plan.getResourceCalculator();
- Resource totCap = plan.getTotalCapacity();
- long now = clock.getTime();
-
- // loop on all moment in time from now to the end of the check Zone
- // or the end of the planned sessions whichever comes first
- for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t +=
- plan.getStep()) {
- Resource excessCap =
- Resources.subtract(plan.getTotalCommittedResources(t), totCap);
- // if we are violating
- if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
- // sorted on reverse order of acceptance, so newest reservations first
- Set<ReservationAllocation> curReservations =
- new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
- for (Iterator<ReservationAllocation> resIter =
- curReservations.iterator(); resIter.hasNext()
- && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) {
- ReservationAllocation reservation = resIter.next();
- plan.deleteReservation(reservation.getReservationId());
- excessCap =
- Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
- LOG.info("Removing reservation " + reservation.getReservationId()
- + " to repair physical-resource constraints in the plan: "
- + plan.getQueueName());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
new file mode 100644
index 0000000..a389928
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A planning algorithm that first runs LowCostAligned, and if it fails runs
+ * Greedy.
+ */
+public class AlignedPlannerWithGreedy implements ReservationAgent {
+
+ // Default smoothness factor
+ private static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
+
+ // Log
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AlignedPlannerWithGreedy.class);
+
+ // Smoothness factor
+ private final ReservationAgent planner;
+
+ // Constructor
+ public AlignedPlannerWithGreedy() {
+ this(DEFAULT_SMOOTHNESS_FACTOR);
+ }
+
+ // Constructor
+ public AlignedPlannerWithGreedy(int smoothnessFactor) {
+
+ // List of algorithms
+ List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
+
+ // LowCostAligned planning algorithm
+ ReservationAgent algAligned =
+ new IterativePlanner(new StageEarliestStartByDemand(),
+ new StageAllocatorLowCostAligned(smoothnessFactor));
+ listAlg.add(algAligned);
+
+ // Greedy planning algorithm
+ ReservationAgent algGreedy =
+ new IterativePlanner(new StageEarliestStartByJobArrival(),
+ new StageAllocatorGreedy());
+ listAlg.add(algGreedy);
+
+ // Set planner:
+ // 1. Attempt to execute algAligned
+ // 2. If failed, fall back to algGreedy
+ planner = new TryManyReservationAgents(listAlg);
+
+ }
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("placing the following ReservationRequest: " + contract);
+
+ try {
+ boolean res =
+ planner.createReservation(reservationId, user, plan, contract);
+
+ if (res) {
+ LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ } else {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ }
+ return res;
+ } catch (PlanningException e) {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+ + ", Contract: " + contract.toString());
+ throw e;
+ }
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("updating the following ReservationRequest: " + contract);
+
+ return planner.updateReservation(reservationId, user, plan, contract);
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ LOG.info("removing the following ReservationId: " + reservationId);
+
+ return planner.deleteReservation(reservationId, user, plan);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
new file mode 100644
index 0000000..db82a66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Agent employs a simple greedy placement strategy, placing the various
+ * stages of a {@link ReservationDefinition} from the deadline moving backward
+ * towards the arrival. This allows jobs with earlier deadline to be scheduled
+ * greedily as well. Combined with an opportunistic anticipation of work if the
+ * cluster is not fully utilized also seems to provide good latency for
+ * best-effort jobs (i.e., jobs running without a reservation).
+ *
+ * This agent does not account for locality and only consider container
+ * granularity for validation purposes (i.e., you can't exceed max-container
+ * size).
+ */
+
+public class GreedyReservationAgent implements ReservationAgent {
+
+ // Log
+ private static final Logger LOG = LoggerFactory
+ .getLogger(GreedyReservationAgent.class);
+
+ // Greedy planner
+ private final ReservationAgent planner = new IterativePlanner(
+ new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("placing the following ReservationRequest: " + contract);
+
+ try {
+ boolean res =
+ planner.createReservation(reservationId, user, plan, contract);
+
+ if (res) {
+ LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ } else {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: "
+ + reservationId.toString() + ", Contract: " + contract.toString());
+ }
+ return res;
+ } catch (PlanningException e) {
+ LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+ + ", Contract: " + contract.toString());
+ throw e;
+ }
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ LOG.info("updating the following ReservationRequest: " + contract);
+
+ return planner.updateReservation(reservationId, user, plan, contract);
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ LOG.info("removing the following ReservationId: " + reservationId);
+
+ return planner.deleteReservation(reservationId, user, plan);
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
new file mode 100644
index 0000000..342c2e7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A planning algorithm consisting of two main phases. The algorithm iterates
+ * over the job stages in descending order. For each stage, the algorithm: 1.
+ * Determines an interval [stageArrivalTime, stageDeadline) in which the stage
+ * is allocated. 2. Computes an allocation for the stage inside the interval.
+ *
+ * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
+ * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
+ * each stage is set as succcessorStartTime - the starting time of its
+ * succeeding stage (or jobDeadline if it is the last stage).
+ *
+ * The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
+ * setAlgComputeStageAllocation
+ */
+public class IterativePlanner extends PlanningAlgorithm {
+
+ // Modifications performed by the algorithm that are not been reflected in the
+ // actual plan while a request is still pending.
+ private RLESparseResourceAllocation planModifications;
+
+ // Data extracted from plan
+ private Map<Long, Resource> planLoads;
+ private Resource capacity;
+ private long step;
+
+ // Job parameters
+ private ReservationRequestInterpreter jobType;
+ private long jobArrival;
+ private long jobDeadline;
+
+ // Phase algorithms
+ private StageEarliestStart algStageEarliestStart = null;
+ private StageAllocator algStageAllocator = null;
+
+ // Constructor
+ public IterativePlanner(StageEarliestStart algEarliestStartTime,
+ StageAllocator algStageAllocator) {
+
+ setAlgStageEarliestStart(algEarliestStartTime);
+ setAlgStageAllocator(algStageAllocator);
+
+ }
+
+ @Override
+ public RLESparseResourceAllocation computeJobAllocation(Plan plan,
+ ReservationId reservationId, ReservationDefinition reservation)
+ throws ContractValidationException {
+
+ // Initialize
+ initialize(plan, reservation);
+
+ // If the job has been previously reserved, logically remove its allocation
+ ReservationAllocation oldReservation =
+ plan.getReservationById(reservationId);
+ if (oldReservation != null) {
+ ignoreOldAllocation(oldReservation);
+ }
+
+ // Create the allocations data structure
+ RLESparseResourceAllocation allocations =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ // Get a reverse iterator for the set of stages
+ ListIterator<ReservationRequest> li =
+ reservation
+ .getReservationRequests()
+ .getReservationResources()
+ .listIterator(
+ reservation.getReservationRequests().getReservationResources()
+ .size());
+
+ // Current stage
+ ReservationRequest currentReservationStage;
+
+ // Index, points on the current node
+ int index =
+ reservation.getReservationRequests().getReservationResources().size();
+
+ // Stage deadlines
+ long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
+ long successorStartingTime = -1;
+
+ // Iterate the stages in reverse order
+ while (li.hasPrevious()) {
+
+ // Get current stage
+ currentReservationStage = li.previous();
+ index -= 1;
+
+ // Validate that the ReservationRequest respects basic constraints
+ validateInputStage(plan, currentReservationStage);
+
+ // Compute an adjusted earliestStart for this resource
+ // (we need this to provision some space for the ORDER contracts)
+ long stageArrivalTime = reservation.getArrival();
+ if (jobType == ReservationRequestInterpreter.R_ORDER
+ || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+ stageArrivalTime =
+ computeEarliestStartingTime(plan, reservation, index,
+ currentReservationStage, stageDeadline);
+ }
+ stageArrivalTime = stepRoundUp(stageArrivalTime, step);
+ stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+
+ // Compute the allocation of a single stage
+ Map<ReservationInterval, Resource> curAlloc =
+ computeStageAllocation(plan, currentReservationStage,
+ stageArrivalTime, stageDeadline);
+
+ // If we did not find an allocation, return NULL
+ // (unless it's an ANY job, then we simply continue).
+ if (curAlloc == null) {
+
+ // If it's an ANY job, we can move to the next possible request
+ if (jobType == ReservationRequestInterpreter.R_ANY) {
+ continue;
+ }
+
+ // Otherwise, the job cannot be allocated
+ return null;
+
+ }
+
+ // Get the start & end time of the current allocation
+ Long stageStartTime = findEarliestTime(curAlloc.keySet());
+ Long stageEndTime = findLatestTime(curAlloc.keySet());
+
+ // If we did find an allocation for the stage, add it
+ for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
+ allocations.addInterval(entry.getKey(), entry.getValue());
+ }
+
+ // If this is an ANY clause, we have finished
+ if (jobType == ReservationRequestInterpreter.R_ANY) {
+ break;
+ }
+
+ // If ORDER job, set the stageDeadline of the next stage to be processed
+ if (jobType == ReservationRequestInterpreter.R_ORDER
+ || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+
+ // Verify that there is no gap, in case the job is ORDER_NO_GAP
+ if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
+ && successorStartingTime != -1
+ && successorStartingTime > stageEndTime) {
+
+ return null;
+
+ }
+
+ // Store the stageStartTime and set the new stageDeadline
+ successorStartingTime = stageStartTime;
+ stageDeadline = stageStartTime;
+
+ }
+
+ }
+
+ // If the allocation is empty, return an error
+ if (allocations.isEmpty()) {
+ return null;
+ }
+
+ return allocations;
+
+ }
+
+ protected void initialize(Plan plan, ReservationDefinition reservation) {
+
+ // Get plan step & capacity
+ capacity = plan.getTotalCapacity();
+ step = plan.getStep();
+
+ // Get job parameters (type, arrival time & deadline)
+ jobType = reservation.getReservationRequests().getInterpreter();
+ jobArrival = stepRoundUp(reservation.getArrival(), step);
+ jobDeadline = stepRoundDown(reservation.getDeadline(), step);
+
+ // Dirty read of plan load
+ planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
+
+ // Initialize the plan modifications
+ planModifications =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ }
+
+ private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
+ long endTime) {
+
+ // Create map
+ Map<Long, Resource> loads = new HashMap<Long, Resource>();
+
+ // Calculate the load for every time slot between [start,end)
+ for (long t = startTime; t < endTime; t += step) {
+ Resource load = plan.getTotalCommittedResources(t);
+ loads.put(t, load);
+ }
+
+ // Return map
+ return loads;
+
+ }
+
+ private void ignoreOldAllocation(ReservationAllocation oldReservation) {
+
+ // If there is no old reservation, return
+ if (oldReservation == null) {
+ return;
+ }
+
+ // Subtract each allocation interval from the planModifications
+ for (Entry<ReservationInterval, Resource> entry : oldReservation
+ .getAllocationRequests().entrySet()) {
+
+ // Read the entry
+ ReservationInterval interval = entry.getKey();
+ Resource resource = entry.getValue();
+
+ // Find the actual request
+ Resource negativeResource = Resources.multiply(resource, -1);
+
+ // Insert it into planModifications as a 'negative' request, to
+ // represent available resources
+ planModifications.addInterval(interval, negativeResource);
+
+ }
+
+ }
+
+ private void validateInputStage(Plan plan, ReservationRequest rr)
+ throws ContractValidationException {
+
+ // Validate concurrency
+ if (rr.getConcurrency() < 1) {
+ throw new ContractValidationException("Gang Size should be >= 1");
+ }
+
+ // Validate number of containers
+ if (rr.getNumContainers() <= 0) {
+ throw new ContractValidationException("Num containers should be > 0");
+ }
+
+ // Check that gangSize and numContainers are compatible
+ if (rr.getNumContainers() % rr.getConcurrency() != 0) {
+ throw new ContractValidationException(
+ "Parallelism must be an exact multiple of gang size");
+ }
+
+ // Check that the largest container request does not exceed the cluster-wide
+ // limit for container sizes
+ if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
+ rr.getCapability(), plan.getMaximumAllocation())) {
+
+ throw new ContractValidationException(
+ "Individual capability requests should not exceed cluster's " +
+ "maxAlloc");
+
+ }
+
+ }
+
+ // Call algEarliestStartTime()
+ protected long computeEarliestStartingTime(Plan plan,
+ ReservationDefinition reservation, int index,
+ ReservationRequest currentReservationStage, long stageDeadline) {
+
+ return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
+ currentReservationStage, stageDeadline);
+
+ }
+
+ // Call algStageAllocator
+ protected Map<ReservationInterval, Resource> computeStageAllocation(
+ Plan plan, ReservationRequest rr, long stageArrivalTime,
+ long stageDeadline) {
+
+ return algStageAllocator.computeStageAllocation(plan, planLoads,
+ planModifications, rr, stageArrivalTime, stageDeadline);
+
+ }
+
+ // Set the algorithm: algStageEarliestStart
+ public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
+
+ this.algStageEarliestStart = alg;
+ return this; // To allow concatenation of setAlg() functions
+
+ }
+
+ // Set the algorithm: algStageAllocator
+ public IterativePlanner setAlgStageAllocator(StageAllocator alg) {
+
+ this.algStageAllocator = alg;
+ return this; // To allow concatenation of setAlg() functions
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
new file mode 100644
index 0000000..abac6ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+public interface Planner {
+
+ /**
+ * Update the existing {@link Plan}, by adding/removing/updating existing
+ * reservations, and adding a subset of the reservation requests in the
+ * contracts parameter.
+ *
+ * @param plan the {@link Plan} to replan
+ * @param contracts the list of reservation requests
+ * @throws PlanningException
+ */
+ public void plan(Plan plan, List<ReservationDefinition> contracts)
+ throws PlanningException;
+
+ /**
+ * Initialize the replanner
+ *
+ * @param planQueueName the name of the queue for this plan
+ * @param conf the scheduler configuration
+ */
+ void init(String planQueueName, ReservationSchedulerConfiguration conf);
+}