You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2015/07/25 16:49:42 UTC
[3/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement
Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
new file mode 100644
index 0000000..9a0a0f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An abstract class that follows the general behavior of planning algorithms.
+ */
+public abstract class PlanningAlgorithm implements ReservationAgent {
+
+ /**
+ * Performs the actual allocation for a ReservationDefinition within a Plan.
+ *
+ * @param reservationId the identifier of the reservation
+ * @param user the user who owns the reservation
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources required by the user for his
+ * session
+ * @param oldReservation the existing reservation (null if none)
+ * @return whether the allocateUser function was successful or not
+ *
+ * @throws PlanningException if the session cannot be fitted into the plan
+ * @throws ContractValidationException
+ */
+ protected boolean allocateUser(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract,
+ ReservationAllocation oldReservation) throws PlanningException,
+ ContractValidationException {
+
+ // Adjust the ResourceDefinition to account for system "imperfections"
+ // (e.g., scheduling delays for large containers).
+ ReservationDefinition adjustedContract = adjustContract(plan, contract);
+
+ // Compute the job allocation
+ RLESparseResourceAllocation allocation =
+ computeJobAllocation(plan, reservationId, adjustedContract);
+
+ // If no job allocation was found, fail
+ if (allocation == null) {
+ throw new PlanningException(
+ "The planning algorithm could not find a valid allocation"
+ + " for your request");
+ }
+
+ // Translate the allocation to a map (with zero paddings)
+ long step = plan.getStep();
+ long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
+ long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
+ Map<ReservationInterval, Resource> mapAllocations =
+ allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
+
+ // Create the reservation
+ ReservationAllocation capReservation =
+ new InMemoryReservationAllocation(reservationId, // ID
+ adjustedContract, // Contract
+ user, // User name
+ plan.getQueueName(), // Queue name
+ findEarliestTime(mapAllocations.keySet()), // Earliest start time
+ findLatestTime(mapAllocations.keySet()), // Latest end time
+ mapAllocations, // Allocations
+ plan.getResourceCalculator(), // Resource calculator
+ plan.getMinimumAllocation()); // Minimum allocation
+
+ // Add (or update) the reservation allocation
+ if (oldReservation != null) {
+ return plan.updateReservation(capReservation);
+ } else {
+ return plan.addReservation(capReservation);
+ }
+
+ }
+
+ private Map<ReservationInterval, Resource>
+ allocationsToPaddedMap(RLESparseResourceAllocation allocation,
+ long jobArrival, long jobDeadline) {
+
+ // Allocate
+ Map<ReservationInterval, Resource> mapAllocations =
+ allocation.toIntervalMap();
+
+ // Zero allocation
+ Resource zeroResource = Resource.newInstance(0, 0);
+
+ // Pad at the beginning
+ long earliestStart = findEarliestTime(mapAllocations.keySet());
+ if (jobArrival < earliestStart) {
+ mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
+ zeroResource);
+ }
+
+ // Pad at the beginning
+ long latestEnd = findLatestTime(mapAllocations.keySet());
+ if (latestEnd < jobDeadline) {
+ mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
+ zeroResource);
+ }
+
+ return mapAllocations;
+
+ }
+
+ public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
+ ReservationId reservationId, ReservationDefinition reservation)
+ throws PlanningException, ContractValidationException;
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Allocate
+ return allocateUser(reservationId, user, plan, contract, null);
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Get the old allocation
+ ReservationAllocation oldAlloc = plan.getReservationById(reservationId);
+
+ // Allocate (ignores the old allocation)
+ return allocateUser(reservationId, user, plan, contract, oldAlloc);
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ // Delete the existing reservation
+ return plan.deleteReservation(reservationId);
+
+ }
+
+ protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
+
+ long ret = Long.MAX_VALUE;
+ for (ReservationInterval s : sesInt) {
+ if (s.getStartTime() < ret) {
+ ret = s.getStartTime();
+ }
+ }
+ return ret;
+
+ }
+
+ protected static long findLatestTime(Set<ReservationInterval> sesInt) {
+
+ long ret = Long.MIN_VALUE;
+ for (ReservationInterval s : sesInt) {
+ if (s.getEndTime() > ret) {
+ ret = s.getEndTime();
+ }
+ }
+ return ret;
+
+ }
+
+ protected static long stepRoundDown(long t, long step) {
+ return (t / step) * step;
+ }
+
+ protected static long stepRoundUp(long t, long step) {
+ return ((t + step - 1) / step) * step;
+ }
+
+ private ReservationDefinition adjustContract(Plan plan,
+ ReservationDefinition originalContract) {
+
+ // Place here adjustment. For example using QueueMetrics we can track
+ // large container delays per YARN-YARN-1990
+
+ return originalContract;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
new file mode 100644
index 0000000..bdea2f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An entity that seeks to acquire resources to satisfy an user's contract
+ */
+public interface ReservationAgent {
+
+ /**
+ * Create a reservation for the user that abides by the specified contract
+ *
+ * @param reservationId the identifier of the reservation to be created.
+ * @param user the user who wants to create the reservation
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources the user requires for his
+ * session
+ *
+ * @return whether the create operation was successful or not
+ * @throws PlanningException if the session cannot be fitted into the plan
+ */
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException;
+
+ /**
+ * Update a reservation for the user that abides by the specified contract
+ *
+ * @param reservationId the identifier of the reservation to be updated
+ * @param user the user who wants to create the session
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources the user requires for his
+ * reservation
+ *
+ * @return whether the update operation was successful or not
+ * @throws PlanningException if the reservation cannot be fitted into the plan
+ */
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException;
+
+ /**
+ * Delete an user reservation
+ *
+ * @param reservationId the identifier of the reservation to be deleted
+ * @param user the user who wants to create the reservation
+ * @param plan the Plan to which the session must be fitted
+ *
+ * @return whether the delete operation was successful or not
+ * @throws PlanningException if the reservation cannot be fitted into the plan
+ */
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
new file mode 100644
index 0000000..7507783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This (re)planner scan a period of time from now to a maximum time window (or
+ * the end of the last session, whichever comes first) checking the overall
+ * capacity is not violated.
+ *
+ * It greedily removes sessions in reversed order of acceptance (latest accepted
+ * is the first removed).
+ */
+public class SimpleCapacityReplanner implements Planner {
+
+ private static final Log LOG = LogFactory
+ .getLog(SimpleCapacityReplanner.class);
+
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+ private final Clock clock;
+
+ // this allows to control to time-span of this replanning
+ // far into the future time instants might be worth replanning for
+ // later on
+ private long lengthOfCheckZone;
+
+ public SimpleCapacityReplanner() {
+ this(new UTCClock());
+ }
+
+ @VisibleForTesting
+ SimpleCapacityReplanner(Clock clock) {
+ this.clock = clock;
+ }
+
+ @Override
+ public void init(String planQueueName,
+ ReservationSchedulerConfiguration conf) {
+ this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
+ }
+
+ @Override
+ public void plan(Plan plan, List<ReservationDefinition> contracts)
+ throws PlanningException {
+
+ if (contracts != null) {
+ throw new RuntimeException(
+ "SimpleCapacityReplanner cannot handle new reservation contracts");
+ }
+
+ ResourceCalculator resCalc = plan.getResourceCalculator();
+ Resource totCap = plan.getTotalCapacity();
+ long now = clock.getTime();
+
+ // loop on all moment in time from now to the end of the check Zone
+ // or the end of the planned sessions whichever comes first
+ for (long t = now;
+ (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
+ t += plan.getStep()) {
+ Resource excessCap =
+ Resources.subtract(plan.getTotalCommittedResources(t), totCap);
+ // if we are violating
+ if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
+ // sorted on reverse order of acceptance, so newest reservations first
+ Set<ReservationAllocation> curReservations =
+ new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
+ for (Iterator<ReservationAllocation> resIter =
+ curReservations.iterator(); resIter.hasNext()
+ && Resources.greaterThan(resCalc, totCap, excessCap,
+ ZERO_RESOURCE);) {
+ ReservationAllocation reservation = resIter.next();
+ plan.deleteReservation(reservation.getReservationId());
+ excessCap =
+ Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
+ LOG.info("Removing reservation " + reservation.getReservationId()
+ + " to repair physical-resource constraints in the plan: "
+ + plan.getQueueName());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
new file mode 100644
index 0000000..9df6b74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * Interface for allocating a single stage in IterativePlanner.
+ */
+public interface StageAllocator {
+
+ /**
+ * Computes the allocation of a stage inside a defined time interval.
+ *
+ * @param plan the Plan to which the reservation must be fitted
+ * @param planLoads a 'dirty' read of the plan loads at each time
+ * @param planModifications the allocations performed by the planning
+ * algorithm which are not yet reflected by plan
+ * @param rr the stage
+ * @param stageEarliestStart the arrival time (earliest starting time) set for
+ * the stage by the two phase planning algorithm
+ * @param stageDeadline the deadline of the stage set by the two phase
+ * planning algorithm
+ *
+ * @return The computed allocation (or null if the stage could not be
+ * allocated)
+ */
+ Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
new file mode 100644
index 0000000..773fbdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Computes the stage allocation according to the greedy allocation rule. The
+ * greedy rule repeatedly allocates requested containers at the rightmost
+ * (latest) free interval.
+ */
+
+public class StageAllocatorGreedy implements StageAllocator {
+
+ @Override
+ public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline) {
+
+ Resource totalCapacity = plan.getTotalCapacity();
+
+ Map<ReservationInterval, Resource> allocationRequests =
+ new HashMap<ReservationInterval, Resource>();
+
+ // compute the gang as a resource and get the duration
+ Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
+ long dur = rr.getDuration();
+ long step = plan.getStep();
+
+ // ceil the duration to the next multiple of the plan step
+ if (dur % step != 0) {
+ dur += (step - (dur % step));
+ }
+
+ // we know for sure that this division has no remainder (part of contract
+ // with user, validate before
+ int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+ int maxGang = 0;
+
+ // loop trying to place until we are done, or we are considering
+ // an invalid range of times
+ while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
+
+ // as we run along we remember how many gangs we can fit, and what
+ // was the most constraining moment in time (we will restart just
+ // after that to place the next batch)
+ maxGang = gangsToPlace;
+ long minPoint = stageDeadline;
+ int curMaxGang = maxGang;
+
+ // start placing at deadline (excluded due to [,) interval semantics and
+ // move backward
+ for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
+ && maxGang > 0; t = t - plan.getStep()) {
+
+ // compute net available resources
+ Resource netAvailableRes = Resources.clone(totalCapacity);
+ // Resources.addTo(netAvailableRes, oldResCap);
+ Resources.subtractFrom(netAvailableRes,
+ plan.getTotalCommittedResources(t));
+ Resources.subtractFrom(netAvailableRes,
+ planModifications.getCapacityAtTime(t));
+
+ // compute maximum number of gangs we could fit
+ curMaxGang =
+ (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+ totalCapacity, netAvailableRes, gang));
+
+ // pick the minimum between available resources in this instant, and how
+ // many gangs we have to place
+ curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+ // compare with previous max, and set it. also remember *where* we found
+ // the minimum (useful for next attempts)
+ if (curMaxGang <= maxGang) {
+ maxGang = curMaxGang;
+ minPoint = t;
+ }
+ }
+
+ // if we were able to place any gang, record this, and decrement
+ // gangsToPlace
+ if (maxGang > 0) {
+ gangsToPlace -= maxGang;
+
+ ReservationInterval reservationInt =
+ new ReservationInterval(stageDeadline - dur, stageDeadline);
+ Resource reservationRes =
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()
+ * maxGang);
+ // remember occupied space (plan is read-only till we find a plausible
+ // allocation for the entire request). This is needed since we might be
+ // placing other ReservationRequest within the same
+ // ReservationDefinition,
+ // and we must avoid double-counting the available resources
+ planModifications.addInterval(reservationInt, reservationRes);
+ allocationRequests.put(reservationInt, reservationRes);
+
+ }
+
+ // reset our new starting point (curDeadline) to the most constraining
+ // point so far, we will look "left" of that to find more places where
+ // to schedule gangs (for sure nothing on the "right" of this point can
+ // fit a full gang.
+ stageDeadline = minPoint;
+ }
+
+ // if no gangs are left to place we succeed and return the allocation
+ if (gangsToPlace == 0) {
+ return allocationRequests;
+ } else {
+ // If we are here is becasue we did not manage to satisfy this request.
+ // So we need to remove unwanted side-effect from tempAssigned (needed
+ // for ANY).
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation
+ : allocationRequests.entrySet()) {
+ planModifications.removeInterval(tempAllocation.getKey(),
+ tempAllocation.getValue());
+ }
+ // and return null to signal failure in this allocation
+ return null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
new file mode 100644
index 0000000..4b5763d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A stage allocator that iteratively allocates containers in the
+ * {@link DurationInterval} with lowest overall cost. The algorithm only
+ * considers intervals of the form: [stageDeadline - (n+1)*duration,
+ * stageDeadline - n*duration) for an integer n. This guarantees that the
+ * allocations are aligned (as opposed to overlapping duration intervals).
+ *
+ * The smoothnessFactor parameter controls the number of containers that are
+ * simultaneously allocated in each iteration of the algorithm.
+ */
+
+public class StageAllocatorLowCostAligned implements StageAllocator {
+
+ // Smoothness factor
+ private int smoothnessFactor = 10;
+
+ // Constructor
+ public StageAllocatorLowCostAligned() {
+ }
+
+ // Constructor
+ public StageAllocatorLowCostAligned(int smoothnessFactor) {
+ this.smoothnessFactor = smoothnessFactor;
+ }
+
+ // computeJobAllocation()
+ @Override
+ public Map<ReservationInterval, Resource> computeStageAllocation(
+ Plan plan, Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline) {
+
+ // Initialize
+ ResourceCalculator resCalc = plan.getResourceCalculator();
+ Resource capacity = plan.getTotalCapacity();
+ long step = plan.getStep();
+
+ // Create allocationRequestsearlies
+ RLESparseResourceAllocation allocationRequests =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ // Initialize parameters
+ long duration = stepRoundUp(rr.getDuration(), step);
+ int windowSizeInDurations =
+ (int) ((stageDeadline - stageEarliestStart) / duration);
+ int totalGangs = rr.getNumContainers() / rr.getConcurrency();
+ int numContainersPerGang = rr.getConcurrency();
+ Resource gang =
+ Resources.multiply(rr.getCapability(), numContainersPerGang);
+
+ // Set maxGangsPerUnit
+ int maxGangsPerUnit =
+ (int) Math.max(
+ Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
+ maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
+
+ // If window size is too small, return null
+ if (windowSizeInDurations <= 0) {
+ return null;
+ }
+
+ // Initialize tree sorted by costs
+ TreeSet<DurationInterval> durationIntervalsSortedByCost =
+ new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
+ @Override
+ public int compare(DurationInterval val1, DurationInterval val2) {
+
+ int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
+ }
+ });
+
+ // Add durationIntervals that end at (endTime - n*duration) for some n.
+ for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+ + duration; intervalEnd -= duration) {
+
+ long intervalStart = intervalEnd - duration;
+
+ // Get duration interval [intervalStart,intervalEnd)
+ DurationInterval durationInterval =
+ getDurationInterval(intervalStart, intervalEnd, planLoads,
+ planModifications, capacity, resCalc, step);
+
+ // If the interval can fit a gang, add it to the tree
+ if (durationInterval.canAllocate(gang, capacity, resCalc)) {
+ durationIntervalsSortedByCost.add(durationInterval);
+ }
+ }
+
+ // Allocate
+ int remainingGangs = totalGangs;
+ while (remainingGangs > 0) {
+
+ // If no durationInterval can fit a gang, break and return null
+ if (durationIntervalsSortedByCost.isEmpty()) {
+ break;
+ }
+
+ // Get best duration interval
+ DurationInterval bestDurationInterval =
+ durationIntervalsSortedByCost.first();
+ int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
+
+ // Add it
+ remainingGangs -= numGangsToAllocate;
+
+ ReservationInterval reservationInt =
+ new ReservationInterval(bestDurationInterval.getStartTime(),
+ bestDurationInterval.getEndTime());
+
+ Resource reservationRes =
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()
+ * numGangsToAllocate);
+
+ planModifications.addInterval(reservationInt, reservationRes);
+ allocationRequests.addInterval(reservationInt, reservationRes);
+
+ // Remove from tree
+ durationIntervalsSortedByCost.remove(bestDurationInterval);
+
+ // Get updated interval
+ DurationInterval updatedDurationInterval =
+ getDurationInterval(bestDurationInterval.getStartTime(),
+ bestDurationInterval.getStartTime() + duration, planLoads,
+ planModifications, capacity, resCalc, step);
+
+ // Add to tree, if possible
+ if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
+ durationIntervalsSortedByCost.add(updatedDurationInterval);
+ }
+
+ }
+
+ // Get the final allocation
+ Map<ReservationInterval, Resource> allocations =
+ allocationRequests.toIntervalMap();
+
+ // If no gangs are left to place we succeed and return the allocation
+ if (remainingGangs <= 0) {
+ return allocations;
+ } else {
+
+ // If we are here is because we did not manage to satisfy this request.
+ // We remove unwanted side-effect from planModifications (needed for ANY).
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation
+ : allocations.entrySet()) {
+
+ planModifications.removeInterval(tempAllocation.getKey(),
+ tempAllocation.getValue());
+
+ }
+ // Return null to signal failure in this allocation
+ return null;
+
+ }
+
+ }
+
+ protected DurationInterval getDurationInterval(long startTime, long endTime,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc, long step) {
+
+ // Initialize the dominant loads structure
+ Resource dominantResources = Resource.newInstance(0, 0);
+
+ // Calculate totalCost and maxLoad
+ double totalCost = 0.0;
+ for (long t = startTime; t < endTime; t += step) {
+
+ // Get the load
+ Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+ // Increase the total cost
+ totalCost += calcCostOfLoad(load, capacity, resCalc);
+
+ // Update the dominant resources
+ dominantResources = Resources.componentwiseMax(dominantResources, load);
+
+ }
+
+ // Return the corresponding durationInterval
+ return new DurationInterval(startTime, endTime, totalCost,
+ dominantResources);
+
+ }
+
+ protected double calcCostOfInterval(long startTime, long endTime,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc, long step) {
+
+ // Sum costs in the interval [startTime,endTime)
+ double totalCost = 0.0;
+ for (long t = startTime; t < endTime; t += step) {
+ totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
+ resCalc);
+ }
+
+ // Return sum
+ return totalCost;
+
+ }
+
+ protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ // Get the current load at time t
+ Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+ // Return cost
+ return calcCostOfLoad(load, capacity, resCalc);
+
+ }
+
+ protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications) {
+
+ Resource planLoad = planLoads.get(t);
+ planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
+
+ return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
+
+ }
+
+ protected double calcCostOfLoad(Resource load, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ return resCalc.ratio(load, capacity);
+
+ }
+
+ protected static long stepRoundDown(long t, long step) {
+ return (t / step) * step;
+ }
+
+ protected static long stepRoundUp(long t, long step) {
+ return ((t + step - 1) / step) * step;
+ }
+
+ /**
+ * An inner class that represents an interval, typically of length duration.
+ * The class holds the total cost of the interval and the maximal load inside
+ * the interval in each dimension (both calculated externally).
+ */
+ protected static class DurationInterval {
+
+ private long startTime;
+ private long endTime;
+ private double cost;
+ private Resource maxLoad;
+
+ // Constructor
+ public DurationInterval(long startTime, long endTime, double cost,
+ Resource maxLoad) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.cost = cost;
+ this.maxLoad = maxLoad;
+ }
+
+ // canAllocate() - boolean function, returns whether requestedResources
+ // can be allocated during the durationInterval without
+ // violating capacity constraints
+ public boolean canAllocate(Resource requestedResources, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
+ return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
+
+ }
+
+ // numCanFit() - returns the maximal number of requestedResources can be
+ // allocated during the durationInterval without violating
+ // capacity constraints
+ public int numCanFit(Resource requestedResources, Resource capacity,
+ ResourceCalculator resCalc) {
+
+ // Represents the largest resource demand that can be satisfied throughout
+ // the entire DurationInterval (i.e., during [startTime,endTime))
+ Resource availableResources = Resources.subtract(capacity, maxLoad);
+
+ // Maximal number of requestedResources that fit inside the interval
+ return (int) Math.floor(Resources.divide(resCalc, capacity,
+ availableResources, requestedResources));
+
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setStartTime(long value) {
+ this.startTime = value;
+ }
+
+ public long getEndTime() {
+ return this.endTime;
+ }
+
+ public void setEndTime(long value) {
+ this.endTime = value;
+ }
+
+ public Resource getMaxLoad() {
+ return this.maxLoad;
+ }
+
+ public void setMaxLoad(Resource value) {
+ this.maxLoad = value;
+ }
+
+ public double getTotalCost() {
+ return this.cost;
+ }
+
+ public void setTotalCost(double value) {
+ this.cost = value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
new file mode 100644
index 0000000..547616a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Interface for setting the earliest start time of a stage in IterativePlanner.
+ */
+public interface StageEarliestStart {
+
+ /**
+ * Computes the earliest allowed starting time for a given stage.
+ *
+ * @param plan the Plan to which the reservation must be fitted
+ * @param reservation the job contract
+ * @param index the index of the stage in the job contract
+ * @param currentReservationStage the stage
+ * @param stageDeadline the deadline of the stage set by the two phase
+ * planning algorithm
+ *
+ * @return the earliest allowed starting time for the stage.
+ */
+ long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
+ int index, ReservationRequest currentReservationStage,
+ long stageDeadline);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
new file mode 100644
index 0000000..5a46a4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.ListIterator;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage proportional to the job weight. The
+ * interval [jobArrival, stageDeadline) is divided as follows. First, each stage
+ * is guaranteed at least its requested duration. Then, the stage receives a
+ * fraction of the remaining time. The fraction is calculated as the ratio
+ * between the weight (total requested resources) of the stage and the total
+ * weight of all proceeding stages.
+ */
+
+public class StageEarliestStartByDemand implements StageEarliestStart {
+
+ private long step;
+
+ @Override
+ public long setEarliestStartTime(Plan plan,
+ ReservationDefinition reservation, int index, ReservationRequest current,
+ long stageDeadline) {
+
+ step = plan.getStep();
+
+ // If this is the first stage, don't bother with the computation.
+ if (index < 1) {
+ return reservation.getArrival();
+ }
+
+ // Get iterator
+ ListIterator<ReservationRequest> li =
+ reservation.getReservationRequests().getReservationResources()
+ .listIterator(index);
+ ReservationRequest rr;
+
+ // Calculate the total weight & total duration
+ double totalWeight = calcWeight(current);
+ long totalDuration = getRoundedDuration(current, plan);
+
+ while (li.hasPrevious()) {
+ rr = li.previous();
+ totalWeight += calcWeight(rr);
+ totalDuration += getRoundedDuration(rr, plan);
+ }
+
+ // Compute the weight of the current stage as compared to remaining ones
+ double ratio = calcWeight(current) / totalWeight;
+
+ // Estimate an early start time, such that:
+ // 1. Every stage is guaranteed to receive at least its duration
+ // 2. The remainder of the window is divided between stages
+ // proportionally to its workload (total memory consumption)
+ long window = stageDeadline - reservation.getArrival();
+ long windowRemainder = window - totalDuration;
+ long earlyStart =
+ (long) (stageDeadline - getRoundedDuration(current, plan)
+ - (windowRemainder * ratio));
+
+ // Realign if necessary (since we did some arithmetic)
+ earlyStart = stepRoundUp(earlyStart, step);
+
+ // Return
+ return earlyStart;
+
+ }
+
+ // Weight = total memory consumption of stage
+ protected double calcWeight(ReservationRequest stage) {
+ return (stage.getDuration() * stage.getCapability().getMemory())
+ * (stage.getNumContainers());
+ }
+
+ protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
+ return stepRoundUp(stage.getDuration(), step);
+ }
+
+ protected static long stepRoundDown(long t, long step) {
+ return (t / step) * step;
+ }
+
+ protected static long stepRoundUp(long t, long step) {
+ return ((t + step - 1) / step) * step;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
new file mode 100644
index 0000000..8347816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage as the job arrival time.
+ */
+public class StageEarliestStartByJobArrival implements StageEarliestStart {
+
+ @Override
+ public long setEarliestStartTime(Plan plan,
+ ReservationDefinition reservation, int index, ReservationRequest current,
+ long stageDeadline) {
+
+ return reservation.getArrival();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
new file mode 100644
index 0000000..1d37ce5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * A planning algorithm that invokes several other planning algorithms according
+ * to a given order. If one of the planners succeeds, the allocation it
+ * generates is returned.
+ */
+public class TryManyReservationAgents implements ReservationAgent {
+
+ // Planning algorithms
+ private final List<ReservationAgent> algs;
+
+ // Constructor
+ public TryManyReservationAgents(List<ReservationAgent> algs) {
+ this.algs = new LinkedList<ReservationAgent>(algs);
+ }
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Save the planning exception
+ PlanningException planningException = null;
+
+ // Try all of the algorithms, in order
+ for (ReservationAgent alg : algs) {
+
+ try {
+ if (alg.createReservation(reservationId, user, plan, contract)) {
+ return true;
+ }
+ } catch (PlanningException e) {
+ planningException = e;
+ }
+
+ }
+
+ // If all of the algorithms failed and one of the algorithms threw an
+ // exception, throw the last planning exception
+ if (planningException != null) {
+ throw planningException;
+ }
+
+ // If all of the algorithms failed, return false
+ return false;
+
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+
+ // Save the planning exception
+ PlanningException planningException = null;
+
+ // Try all of the algorithms, in order
+ for (ReservationAgent alg : algs) {
+
+ try {
+ if (alg.updateReservation(reservationId, user, plan, contract)) {
+ return true;
+ }
+ } catch (PlanningException e) {
+ planningException = e;
+ }
+
+ }
+
+ // If all of the algorithms failed and one of the algorithms threw an
+ // exception, throw the last planning exception
+ if (planningException != null) {
+ throw planningException;
+ }
+
+ // If all of the algorithms failed, return false
+ return false;
+
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+
+ return plan.deleteReservation(reservationId);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index be1d69a..adb9dcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -89,7 +90,7 @@ public class ReservationSystemTestUtil {
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert.assertTrue(
- plan.getReservationAgent() instanceof GreedyReservationAgent);
+ plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert.assertTrue(
plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
@@ -102,7 +103,7 @@ public class ReservationSystemTestUtil {
Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert
- .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+ .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 19f876d..f608c3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index b8663f6..15f9a89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
index f294eaf..4b685b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
index e9a4f50..43316f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;