You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:46:53 UTC
[10/50] [abbrv] hadoop git commit: YARN-4359. Update LowCost agents
logic to take advantage of YARN-4358. (Jonathan Yaniv and Ishai Menache via
Subru).
YARN-4359. Update LowCost agents logic to take advantage of YARN-4358. (Jonathan Yaniv and Ishai Menache via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3a615ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3a615ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3a615ee
Branch: refs/heads/HDFS-7240
Commit: a3a615eeab8c14ccdc548311097e62a916963dc5
Parents: 14b5c93
Author: Subru Krishnan <su...@apache.org>
Authored: Mon May 1 16:01:07 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon May 1 16:01:07 2017 -0700
----------------------------------------------------------------------
.../reservation/InMemoryPlan.java | 11 +
.../resourcemanager/reservation/PlanView.java | 9 +
.../planning/AlignedPlannerWithGreedy.java | 15 +-
.../planning/GreedyReservationAgent.java | 13 +-
.../reservation/planning/IterativePlanner.java | 196 ++++-----
.../reservation/planning/ReservationAgent.java | 23 +-
.../planning/SimpleCapacityReplanner.java | 8 +-
.../reservation/planning/StageAllocator.java | 10 +-
.../planning/StageAllocatorGreedy.java | 4 +-
.../planning/StageAllocatorGreedyRLE.java | 4 +-
.../planning/StageAllocatorLowCostAligned.java | 279 +++++++++----
.../planning/StageEarliestStart.java | 46 ---
.../planning/StageEarliestStartByDemand.java | 106 -----
.../StageEarliestStartByJobArrival.java | 39 --
.../planning/StageExecutionInterval.java | 47 +++
.../StageExecutionIntervalByDemand.java | 144 +++++++
.../StageExecutionIntervalUnconstrained.java | 73 ++++
.../planning/TestAlignedPlanner.java | 411 +++++++++++++++++--
.../planning/TestGreedyReservationAgent.java | 8 +-
.../planning/TestSimpleCapacityReplanner.java | 4 +-
20 files changed, 987 insertions(+), 463 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 3afcd47..783fd09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -687,4 +687,15 @@ public class InMemoryPlan implements Plan {
readLock.unlock();
}
}
+
+ @Override
+ public RLESparseResourceAllocation getCumulativeLoadOverTime(
+ long start, long end) {
+ readLock.lock();
+ try {
+ return rleSparseVector.getRangeOverlapping(start, end);
+ } finally {
+ readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 699f461..2767993 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -174,4 +174,13 @@ public interface PlanView extends PlanContext {
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end);
+ /**
+ * Get the cumulative load over a time interval.
+ *
+ * @param start Start of the time interval.
+ * @param end End of the time interval.
+ * @return RLE sparse allocation.
+ */
+ RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
index 00c2333..3853f41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -39,6 +39,8 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
public static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
public static final String SMOOTHNESS_FACTOR =
"yarn.resourcemanager.reservation-system.smoothness-factor";
+ private boolean allocateLeft = false;
+
// Log
private static final Logger LOG = LoggerFactory
@@ -49,26 +51,31 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
// Constructor
public AlignedPlannerWithGreedy() {
+
}
@Override
public void init(Configuration conf) {
int smoothnessFactor =
conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR);
+ allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
+ DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
// List of algorithms
List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
// LowCostAligned planning algorithm
ReservationAgent algAligned =
- new IterativePlanner(new StageEarliestStartByDemand(),
- new StageAllocatorLowCostAligned(smoothnessFactor), false);
+ new IterativePlanner(new StageExecutionIntervalByDemand(),
+ new StageAllocatorLowCostAligned(smoothnessFactor, allocateLeft),
+ allocateLeft);
+
listAlg.add(algAligned);
// Greedy planning algorithm
ReservationAgent algGreedy =
- new IterativePlanner(new StageEarliestStartByJobArrival(),
- new StageAllocatorGreedy(), false);
+ new IterativePlanner(new StageExecutionIntervalUnconstrained(),
+ new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
listAlg.add(algGreedy);
// Set planner:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
index 1559b97..637a17b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -47,9 +47,6 @@ public class GreedyReservationAgent implements ReservationAgent {
// Greedy planner
private ReservationAgent planner;
- public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
- "yarn.resourcemanager.reservation-system.favor-early-allocation";
- public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
private boolean allocateLeft;
public GreedyReservationAgent() {
@@ -57,20 +54,20 @@ public class GreedyReservationAgent implements ReservationAgent {
@Override
public void init(Configuration conf) {
- allocateLeft = conf.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION,
+ allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
if (allocateLeft) {
LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
+ " (left) allocations (controlled by parameter: "
- + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+ + FAVOR_EARLY_ALLOCATION + ")");
} else {
LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
+ " (right) allocations (controlled by parameter: "
- + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+ + FAVOR_EARLY_ALLOCATION + ")");
}
planner =
- new IterativePlanner(new StageEarliestStartByJobArrival(),
+ new IterativePlanner(new StageExecutionIntervalUnconstrained(),
new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
}
@@ -123,4 +120,4 @@ public class GreedyReservationAgent implements ReservationAgent {
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index 24d237a..83f272e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.ListIterator;
import java.util.Map;
@@ -32,26 +31,24 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A planning algorithm consisting of two main phases. The algorithm iterates
- * over the job stages in descending order. For each stage, the algorithm: 1.
- * Determines an interval [stageArrivalTime, stageDeadline) in which the stage
- * is allocated. 2. Computes an allocation for the stage inside the interval.
- *
- * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
- * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
- * each stage is set as succcessorStartTime - the starting time of its
- * succeeding stage (or jobDeadline if it is the last stage).
- *
- * The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
- * setAlgComputeStageAllocation
+ * over the job stages in ascending/descending order, depending on the flag
+ * allocateLeft. For each stage, the algorithm: 1. Determines an interval
+ * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an
+ * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1
+ * sets the allocation window of each stage to be [jobArrival, jobDeadline]. For
+ * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as
+ * succcessorStartTime - the starting time of its succeeding stage (or
+ * jobDeadline if it is the last stage). The phases are set using the two
+ * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator
*/
public class IterativePlanner extends PlanningAlgorithm {
@@ -60,7 +57,7 @@ public class IterativePlanner extends PlanningAlgorithm {
private RLESparseResourceAllocation planModifications;
// Data extracted from plan
- private Map<Long, Resource> planLoads;
+ private RLESparseResourceAllocation planLoads;
private Resource capacity;
private long step;
@@ -70,16 +67,16 @@ public class IterativePlanner extends PlanningAlgorithm {
private long jobDeadline;
// Phase algorithms
- private StageEarliestStart algStageEarliestStart = null;
+ private StageExecutionInterval algStageExecutionInterval = null;
private StageAllocator algStageAllocator = null;
private final boolean allocateLeft;
// Constructor
- public IterativePlanner(StageEarliestStart algEarliestStartTime,
+ public IterativePlanner(StageExecutionInterval algStageExecutionInterval,
StageAllocator algStageAllocator, boolean allocateLeft) {
this.allocateLeft = allocateLeft;
- setAlgStageEarliestStart(algEarliestStartTime);
+ setAlgStageExecutionInterval(algStageExecutionInterval);
setAlgStageAllocator(algStageAllocator);
}
@@ -101,12 +98,6 @@ public class IterativePlanner extends PlanningAlgorithm {
// Current stage
ReservationRequest currentReservationStage;
- // Stage deadlines
- long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
- long successorStartingTime = -1;
- long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
- long stageArrivalTime = -1;
-
// Iterate the stages in reverse order
while (stageProvider.hasNext()) {
@@ -116,27 +107,17 @@ public class IterativePlanner extends PlanningAlgorithm {
// Validate that the ReservationRequest respects basic constraints
validateInputStage(plan, currentReservationStage);
- // Compute an adjusted earliestStart for this resource
- // (we need this to provision some space for the ORDER contracts)
+ // Set the stageArrival and stageDeadline
+ ReservationInterval stageInterval =
+ setStageExecutionInterval(plan, reservation, currentReservationStage,
+ allocations);
+ Long stageArrival = stageInterval.getStartTime();
+ Long stageDeadline = stageInterval.getEndTime();
- if (allocateLeft) {
- stageArrivalTime = predecessorEndTime;
- } else {
- stageArrivalTime = reservation.getArrival();
- if (jobType == ReservationRequestInterpreter.R_ORDER
- || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
- stageArrivalTime =
- computeEarliestStartingTime(plan, reservation,
- stageProvider.getCurrentIndex(), currentReservationStage,
- stageDeadline);
- }
- stageArrivalTime = stepRoundUp(stageArrivalTime, step);
- stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
- }
- // Compute the allocation of a single stage
+ // Compute stage allocation
Map<ReservationInterval, Resource> curAlloc =
- computeStageAllocation(plan, currentReservationStage,
- stageArrivalTime, stageDeadline, user, reservationId);
+ computeStageAllocation(plan, currentReservationStage, stageArrival,
+ stageDeadline, user, reservationId);
// If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue).
@@ -152,9 +133,13 @@ public class IterativePlanner extends PlanningAlgorithm {
}
- // Get the start & end time of the current allocation
- Long stageStartTime = findEarliestTime(curAlloc);
- Long stageEndTime = findLatestTime(curAlloc);
+ // Validate ORDER_NO_GAP
+ if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+ if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) {
+ throw new PlanningException(
+ "The allocation found does not respect ORDER_NO_GAP");
+ }
+ }
// If we did find an allocation for the stage, add it
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
@@ -165,33 +150,6 @@ public class IterativePlanner extends PlanningAlgorithm {
if (jobType == ReservationRequestInterpreter.R_ANY) {
break;
}
-
- // If ORDER job, set the stageDeadline of the next stage to be processed
- if (jobType == ReservationRequestInterpreter.R_ORDER
- || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
-
- // CHECK ORDER_NO_GAP
- // Verify that there is no gap, in case the job is ORDER_NO_GAP
- // note that the test is different left-to-right and right-to-left
- if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
- && successorStartingTime != -1
- && ((allocateLeft && predecessorEndTime < stageStartTime) ||
- (!allocateLeft && (stageEndTime < successorStartingTime))
- )
- || (!isNonPreemptiveAllocation(curAlloc))) {
- throw new PlanningException(
- "The allocation found does not respect ORDER_NO_GAP");
- }
-
- if (allocateLeft) {
- // Store the stageStartTime and set the new stageDeadline
- predecessorEndTime = stageEndTime;
- } else {
- // Store the stageStartTime and set the new stageDeadline
- successorStartingTime = stageStartTime;
- stageDeadline = stageStartTime;
- }
- }
}
// If the allocation is empty, return an error
@@ -200,7 +158,39 @@ public class IterativePlanner extends PlanningAlgorithm {
}
return allocations;
+ }
+ protected static boolean validateOrderNoGap(
+ RLESparseResourceAllocation allocations,
+ Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) {
+
+ // Left to right
+ if (allocateLeft) {
+ Long stageStartTime = findEarliestTime(curAlloc);
+ Long allocationEndTime = allocations.getLatestNonNullTime();
+
+ // Check that there is no gap between stages
+ if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) {
+ return false;
+ }
+ // Right to left
+ } else {
+ Long stageEndTime = findLatestTime(curAlloc);
+ Long allocationStartTime = allocations.getEarliestStartTime();
+
+ // Check that there is no gap between stages
+ if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) {
+ return false;
+ }
+ }
+
+ // Check that the stage allocation does not violate ORDER_NO_GAP
+ if (!isNonPreemptiveAllocation(curAlloc)) {
+ return false;
+ }
+
+ // The allocation is legal
+ return true;
}
protected void initialize(Plan plan, ReservationId reservationId,
@@ -223,35 +213,15 @@ public class IterativePlanner extends PlanningAlgorithm {
// planLoads are not used by other StageAllocators... and don't deal
// well with huge reservation ranges
- if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) {
- planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
- ReservationAllocation oldRes = plan.getReservationById(reservationId);
- if (oldRes != null) {
- planModifications =
- RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
- plan.getTotalCapacity(), planModifications,
- oldRes.getResourcesOverTime(), RLEOperator.subtract,
- jobArrival, jobDeadline);
- }
+ planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
+ ReservationAllocation oldRes = plan.getReservationById(reservationId);
+ if (oldRes != null) {
+ planLoads =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), planLoads,
+ oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
+ jobDeadline);
}
-
- }
-
- private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
- long endTime) {
-
- // Create map
- Map<Long, Resource> loads = new HashMap<Long, Resource>();
-
- // Calculate the load for every time slot between [start,end)
- for (long t = startTime; t < endTime; t += step) {
- Resource load = plan.getTotalCommittedResources(t);
- loads.put(t, load);
- }
-
- // Return map
- return loads;
-
}
private void validateInputStage(Plan plan, ReservationRequest rr)
@@ -286,7 +256,7 @@ public class IterativePlanner extends PlanningAlgorithm {
}
- private boolean isNonPreemptiveAllocation(
+ private static boolean isNonPreemptiveAllocation(
Map<ReservationInterval, Resource> curAlloc) {
// Checks whether a stage allocation is non preemptive or not.
@@ -329,14 +299,13 @@ public class IterativePlanner extends PlanningAlgorithm {
}
- // Call algEarliestStartTime()
- protected long computeEarliestStartingTime(Plan plan,
- ReservationDefinition reservation, int index,
- ReservationRequest currentReservationStage, long stageDeadline) {
-
- return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
- currentReservationStage, stageDeadline);
-
+ // Call setStageExecutionInterval()
+ protected ReservationInterval setStageExecutionInterval(Plan plan,
+ ReservationDefinition reservation,
+ ReservationRequest currentReservationStage,
+ RLESparseResourceAllocation allocations) {
+ return algStageExecutionInterval.computeExecutionInterval(plan,
+ reservation, currentReservationStage, allocateLeft, allocations);
}
// Call algStageAllocator
@@ -350,10 +319,11 @@ public class IterativePlanner extends PlanningAlgorithm {
}
- // Set the algorithm: algStageEarliestStart
- public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
+ // Set the algorithm: algStageExecutionInterval
+ public IterativePlanner setAlgStageExecutionInterval(
+ StageExecutionInterval alg) {
- this.algStageEarliestStart = alg;
+ this.algStageExecutionInterval = alg;
return this; // To allow concatenation of setAlg() functions
}
@@ -375,7 +345,7 @@ public class IterativePlanner extends PlanningAlgorithm {
private final boolean allocateLeft;
- private ListIterator<ReservationRequest> li;
+ private final ListIterator<ReservationRequest> li;
public StageProvider(boolean allocateLeft,
ReservationDefinition reservation) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
index 52e7055..3c448b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
@@ -29,14 +29,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
public interface ReservationAgent {
/**
+ * Constant defining the preferential treatment of time for equally valid
+ * allocations.
+ */
+ final static String FAVOR_EARLY_ALLOCATION =
+ "yarn.resourcemanager.reservation-system.favor-early-allocation";
+ /**
+ * By default favor early allocations.
+ */
+ final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
+
+ /**
* Create a reservation for the user that abides by the specified contract
- *
+ *
* @param reservationId the identifier of the reservation to be created.
* @param user the user who wants to create the reservation
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his
* session
- *
+ *
* @return whether the create operation was successful or not
* @throws PlanningException if the session cannot be fitted into the plan
*/
@@ -45,13 +56,13 @@ public interface ReservationAgent {
/**
* Update a reservation for the user that abides by the specified contract
- *
+ *
* @param reservationId the identifier of the reservation to be updated
* @param user the user who wants to create the session
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his
* reservation
- *
+ *
* @return whether the update operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan
*/
@@ -60,11 +71,11 @@ public interface ReservationAgent {
/**
* Delete an user reservation
- *
+ *
* @param reservationId the identifier of the reservation to be deleted
* @param user the user who wants to create the reservation
* @param plan the Plan to which the session must be fitted
- *
+ *
* @return whether the delete operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
index 7507783..7bfc730 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
@@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
* This (re)planner scan a period of time from now to a maximum time window (or
* the end of the last session, whichever comes first) checking the overall
* capacity is not violated.
- *
+ *
* It greedily removes sessions in reversed order of acceptance (latest accepted
* is the first removed).
*/
@@ -90,8 +90,8 @@ public class SimpleCapacityReplanner implements Planner {
// loop on all moment in time from now to the end of the check Zone
// or the end of the planned sessions whichever comes first
- for (long t = now;
- (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
+ for (long t = now;
+ (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
t += plan.getStep()) {
Resource excessCap =
Resources.subtract(plan.getTotalCommittedResources(t), totCap);
@@ -102,7 +102,7 @@ public class SimpleCapacityReplanner implements Planner {
new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
for (Iterator<ReservationAllocation> resIter =
curReservations.iterator(); resIter.hasNext()
- && Resources.greaterThan(resCalc, totCap, excessCap,
+ && Resources.greaterThan(resCalc, totCap, excessCap,
ZERO_RESOURCE);) {
ReservationAllocation reservation = resIter.next();
plan.deleteReservation(reservation.getReservationId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index b95f8d4..ec6d9c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -41,19 +41,21 @@ public interface StageAllocator {
* @param planModifications the allocations performed by the planning
* algorithm which are not yet reflected by plan
* @param rr the stage
- * @param stageEarliestStart the arrival time (earliest starting time) set for
+ * @param stageArrival the arrival time (earliest starting time) set for
* the stage by the two phase planning algorithm
* @param stageDeadline the deadline of the stage set by the two phase
* planning algorithm
+ * @param user name of the user
+ * @param oldId identifier of the old reservation
*
* @return The computed allocation (or null if the stage could not be
* allocated)
* @throws PlanningException
*/
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
- Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
- long stageEarliestStart, long stageDeadline, String user,
+ long stageArrival, long stageDeadline, String user,
ReservationId oldId) throws PlanningException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index c836970..da04336 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -41,7 +41,7 @@ public class StageAllocatorGreedy implements StageAllocator {
@Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
- Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index 5e748fc..ec83e02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -52,7 +52,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
@Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
- Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index b9fd8e1..e45f58c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -18,8 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -27,46 +31,55 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A stage allocator that iteratively allocates containers in the
* {@link DurationInterval} with lowest overall cost. The algorithm only
- * considers intervals of the form: [stageDeadline - (n+1)*duration,
- * stageDeadline - n*duration) for an integer n. This guarantees that the
- * allocations are aligned (as opposed to overlapping duration intervals).
- *
- * The smoothnessFactor parameter controls the number of containers that are
- * simultaneously allocated in each iteration of the algorithm.
+ * considers non-overlapping intervals of length 'duration'. This guarantees
+ * that the allocations are aligned. If 'allocateLeft == true', the intervals
+ * considered by the algorithm are aligned to stageArrival; otherwise, they are
+ * aligned to stageDeadline. The smoothnessFactor parameter controls the number
+ * of containers that are simultaneously allocated in each iteration of the
+ * algorithm.
*/
public class StageAllocatorLowCostAligned implements StageAllocator {
+ private final boolean allocateLeft;
// Smoothness factor
private int smoothnessFactor = 10;
// Constructor
- public StageAllocatorLowCostAligned() {
+ public StageAllocatorLowCostAligned(boolean allocateLeft) {
+ this.allocateLeft = allocateLeft;
}
// Constructor
- public StageAllocatorLowCostAligned(int smoothnessFactor) {
+ public StageAllocatorLowCostAligned(int smoothnessFactor,
+ boolean allocateLeft) {
+ this.allocateLeft = allocateLeft;
this.smoothnessFactor = smoothnessFactor;
}
- // computeJobAllocation()
@Override
- public Map<ReservationInterval, Resource> computeStageAllocation(
- Plan plan, Map<Long, Resource> planLoads,
+ public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
- long stageEarliestStart, long stageDeadline, String user,
- ReservationId oldId) {
+ long stageArrival, long stageDeadline, String user, ReservationId oldId)
+ throws PlanningException {
// Initialize
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity();
+
+ RLESparseResourceAllocation netRLERes = plan
+ .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
+
long step = plan.getStep();
// Create allocationRequestsearlies
@@ -76,16 +89,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
// Initialize parameters
long duration = stepRoundUp(rr.getDuration(), step);
int windowSizeInDurations =
- (int) ((stageDeadline - stageEarliestStart) / duration);
+ (int) ((stageDeadline - stageArrival) / duration);
int totalGangs = rr.getNumContainers() / rr.getConcurrency();
int numContainersPerGang = rr.getConcurrency();
Resource gang =
Resources.multiply(rr.getCapability(), numContainersPerGang);
// Set maxGangsPerUnit
- int maxGangsPerUnit =
- (int) Math.max(
- Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
+ int maxGangsPerUnit = (int) Math
+ .max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
// If window size is too small, return null
@@ -93,6 +105,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
return null;
}
+ final int preferLeft = allocateLeft ? 1 : -1;
+
// Initialize tree sorted by costs
TreeSet<DurationInterval> durationIntervalsSortedByCost =
new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
@@ -104,23 +118,26 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
return cmp;
}
- return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
+ return preferLeft
+ * Long.compare(val1.getEndTime(), val2.getEndTime());
}
});
+ List<Long> intervalEndTimes =
+ computeIntervalEndTimes(stageArrival, stageDeadline, duration);
+
// Add durationIntervals that end at (endTime - n*duration) for some n.
- for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
- + duration; intervalEnd -= duration) {
+ for (long intervalEnd : intervalEndTimes) {
long intervalStart = intervalEnd - duration;
// Get duration interval [intervalStart,intervalEnd)
DurationInterval durationInterval =
getDurationInterval(intervalStart, intervalEnd, planLoads,
- planModifications, capacity, resCalc, step);
+ planModifications, capacity, netRLERes, resCalc, step, gang);
// If the interval can fit a gang, add it to the tree
- if (durationInterval.canAllocate(gang, capacity, resCalc)) {
+ if (durationInterval.canAllocate()) {
durationIntervalsSortedByCost.add(durationInterval);
}
}
@@ -139,8 +156,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
durationIntervalsSortedByCost.first();
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
numGangsToAllocate =
- Math.min(numGangsToAllocate,
- bestDurationInterval.numCanFit(gang, capacity, resCalc));
+ Math.min(numGangsToAllocate, bestDurationInterval.numCanFit());
// Add it
remainingGangs -= numGangsToAllocate;
@@ -148,9 +164,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
new ReservationInterval(bestDurationInterval.getStartTime(),
bestDurationInterval.getEndTime());
- Resource reservationRes =
- Resources.multiply(rr.getCapability(), rr.getConcurrency()
- * numGangsToAllocate);
+ Resource reservationRes = Resources.multiply(rr.getCapability(),
+ rr.getConcurrency() * numGangsToAllocate);
planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.addInterval(reservationInt, reservationRes);
@@ -162,10 +177,10 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
DurationInterval updatedDurationInterval =
getDurationInterval(bestDurationInterval.getStartTime(),
bestDurationInterval.getStartTime() + duration, planLoads,
- planModifications, capacity, resCalc, step);
+ planModifications, capacity, netRLERes, resCalc, step, gang);
// Add to tree, if possible
- if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
+ if (updatedDurationInterval.canAllocate()) {
durationIntervalsSortedByCost.add(updatedDurationInterval);
}
@@ -180,10 +195,12 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
return allocations;
} else {
- // If we are here is because we did not manage to satisfy this request.
- // We remove unwanted side-effect from planModifications (needed for ANY).
- for (Map.Entry<ReservationInterval, Resource> tempAllocation
- : allocations.entrySet()) {
+ // If we are here is because we did not manage to satisfy this
+ // request.
+ // We remove unwanted side-effect from planModifications (needed for
+ // ANY).
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation : allocations
+ .entrySet()) {
planModifications.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
@@ -196,37 +213,144 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
}
- protected DurationInterval getDurationInterval(long startTime, long endTime,
- Map<Long, Resource> planLoads,
+ private List<Long> computeIntervalEndTimes(long stageEarliestStart,
+ long stageDeadline, long duration) {
+
+ List<Long> intervalEndTimes = new ArrayList<Long>();
+ if (!allocateLeft) {
+ for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+ + duration; intervalEnd -= duration) {
+ intervalEndTimes.add(intervalEnd);
+ }
+ } else {
+ for (long intervalStart =
+ stageEarliestStart; intervalStart <= stageDeadline
+ - duration; intervalStart += duration) {
+ intervalEndTimes.add(intervalStart + duration);
+ }
+ }
+
+ return intervalEndTimes;
+ }
+
+ protected static DurationInterval getDurationInterval(long startTime,
+ long endTime, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
- ResourceCalculator resCalc, long step) {
+ RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc,
+ long step, Resource requestedResources) throws PlanningException {
- // Initialize the dominant loads structure
- Resource dominantResources = Resource.newInstance(0, 0);
+ // Get the total cost associated with the duration interval
+ double totalCost = getDurationIntervalTotalCost(startTime, endTime,
+ planLoads, planModifications, capacity, resCalc, step);
- // Calculate totalCost and maxLoad
- double totalCost = 0.0;
- for (long t = startTime; t < endTime; t += step) {
+ // Calculate how many gangs can fit, i.e., how many times can 'capacity'
+ // be allocated within the duration interval [startTime, endTime)
+ int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime,
+ planModifications, capacity, netRLERes, resCalc, requestedResources);
+
+ // Return the desired durationInterval
+ return new DurationInterval(startTime, endTime, totalCost, gangsCanFit);
+
+ }
+
+ protected static double getDurationIntervalTotalCost(long startTime,
+ long endTime, RLESparseResourceAllocation planLoads,
+ RLESparseResourceAllocation planModifications, Resource capacity,
+ ResourceCalculator resCalc, long step) throws PlanningException {
+
+ // Compute the current resource load within the interval [startTime,endTime)
+ // by adding planLoads (existing load) and planModifications (load that
+ // corresponds to the current job).
+ RLESparseResourceAllocation currentLoad =
+ RLESparseResourceAllocation.merge(resCalc, capacity, planLoads,
+ planModifications, RLEOperator.add, startTime, endTime);
+
+ // Convert load from RLESparseResourceAllocation to a Map representation
+ NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative();
- // Get the load
- Resource load = getLoadAtTime(t, planLoads, planModifications);
+ // Initialize auxiliary variables
+ double totalCost = 0.0;
+ Long tPrev = -1L;
+ Resource loadPrev = Resources.none();
+ double cost = 0.0;
+
+ // Iterate over time points. For each point 't', accumulate the total cost
+ // that corresponds to the interval [tPrev, t). The cost associated within
+ // this interval is fixed for each of the time steps, therefore the cost of
+ // a single step is multiplied by (t - tPrev) / step.
+ for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) {
+ Long t = e.getKey();
+ Resource load = e.getValue();
+ if (tPrev != -1L) {
+ tPrev = Math.max(tPrev, startTime);
+ cost = calcCostOfLoad(loadPrev, capacity, resCalc);
+ totalCost = totalCost + cost * (t - tPrev) / step;
+ }
- // Increase the total cost
- totalCost += calcCostOfLoad(load, capacity, resCalc);
+ tPrev = t;
+ loadPrev = load;
+ }
- // Update the dominant resources
- dominantResources = Resources.componentwiseMax(dominantResources, load);
+ // Add the cost associated with the last interval (the for loop does not
+ // calculate it).
+ if (loadPrev != null) {
+ // This takes care of the corner case of a single entry
+ tPrev = Math.max(tPrev, startTime);
+ cost = calcCostOfLoad(loadPrev, capacity, resCalc);
+ totalCost = totalCost + cost * (endTime - tPrev) / step;
}
- // Return the corresponding durationInterval
- return new DurationInterval(startTime, endTime, totalCost,
- dominantResources);
+ // Return the overall cost
+ return totalCost;
+ }
+
+ protected static int getDurationIntervalGangsCanFit(long startTime,
+ long endTime, RLESparseResourceAllocation planModifications,
+ Resource capacity, RLESparseResourceAllocation netRLERes,
+ ResourceCalculator resCalc, Resource requestedResources)
+ throws PlanningException {
+
+ // Initialize auxiliary variables
+ int gangsCanFit = Integer.MAX_VALUE;
+ int curGangsCanFit;
+
+ // Calculate the total amount of available resources between startTime
+ // and endTime, by subtracting planModifications from netRLERes
+ RLESparseResourceAllocation netAvailableResources =
+ RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes,
+ planModifications, RLEOperator.subtractTestNonNegative, startTime,
+ endTime);
+
+ // Convert result to a map
+ NavigableMap<Long, Resource> mapAvailableCapacity =
+ netAvailableResources.getCumulative();
+
+ // Iterate over the map representation.
+ // At each point, calculate how many times does 'requestedResources' fit.
+ // The result is the minimum over all time points.
+ for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) {
+ Long t = e.getKey();
+ Resource curAvailable = e.getValue();
+ if (t >= endTime) {
+ break;
+ }
+ if (curAvailable == null) {
+ gangsCanFit = 0;
+ } else {
+ curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity,
+ curAvailable, requestedResources));
+ if (curGangsCanFit < gangsCanFit) {
+ gangsCanFit = curGangsCanFit;
+ }
+ }
+ }
+ return gangsCanFit;
}
protected double calcCostOfInterval(long startTime, long endTime,
- Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) {
@@ -242,7 +366,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
}
- protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
+ protected double calcCostOfTimeSlot(long t,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc) {
@@ -254,17 +379,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
}
- protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
+ protected Resource getLoadAtTime(long t,
+ RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications) {
- Resource planLoad = planLoads.get(t);
- planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
+ Resource planLoad = planLoads.getCapacityAtTime(t);
return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
}
- protected double calcCostOfLoad(Resource load, Resource capacity,
+ protected static double calcCostOfLoad(Resource load, Resource capacity,
ResourceCalculator resCalc) {
return resCalc.ratio(load, capacity);
@@ -289,42 +414,30 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
private long startTime;
private long endTime;
private double cost;
- private Resource maxLoad;
+ private final int gangsCanFit;
// Constructor
public DurationInterval(long startTime, long endTime, double cost,
- Resource maxLoad) {
+ int gangsCanfit) {
this.startTime = startTime;
this.endTime = endTime;
this.cost = cost;
- this.maxLoad = maxLoad;
+ this.gangsCanFit = gangsCanfit;
}
// canAllocate() - boolean function, returns whether requestedResources
// can be allocated during the durationInterval without
// violating capacity constraints
- public boolean canAllocate(Resource requestedResources, Resource capacity,
- ResourceCalculator resCalc) {
-
- Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
- return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
-
+ public boolean canAllocate() {
+ return (gangsCanFit > 0);
}
// numCanFit() - returns the maximal number of requestedResources can be
// allocated during the durationInterval without violating
// capacity constraints
- public int numCanFit(Resource requestedResources, Resource capacity,
- ResourceCalculator resCalc) {
-
- // Represents the largest resource demand that can be satisfied throughout
- // the entire DurationInterval (i.e., during [startTime,endTime))
- Resource availableResources = Resources.subtract(capacity, maxLoad);
-
- // Maximal number of requestedResources that fit inside the interval
- return (int) Math.floor(Resources.divide(resCalc, capacity,
- availableResources, requestedResources));
+ public int numCanFit() {
+ return gangsCanFit;
}
public long getStartTime() {
@@ -343,14 +456,6 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
this.endTime = value;
}
- public Resource getMaxLoad() {
- return this.maxLoad;
- }
-
- public void setMaxLoad(Resource value) {
- this.maxLoad = value;
- }
-
public double getTotalCost() {
return this.cost;
}
@@ -359,11 +464,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
this.cost = value;
}
+ @Override
public String toString() {
+
StringBuilder sb = new StringBuilder();
+
sb.append(" start: " + startTime).append(" end: " + endTime)
- .append(" cost: " + cost).append(" maxLoad: " + maxLoad);
+ .append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit);
+
return sb.toString();
+
}
+
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
deleted file mode 100644
index 547616a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
-
-/**
- * Interface for setting the earliest start time of a stage in IterativePlanner.
- */
-public interface StageEarliestStart {
-
- /**
- * Computes the earliest allowed starting time for a given stage.
- *
- * @param plan the Plan to which the reservation must be fitted
- * @param reservation the job contract
- * @param index the index of the stage in the job contract
- * @param currentReservationStage the stage
- * @param stageDeadline the deadline of the stage set by the two phase
- * planning algorithm
- *
- * @return the earliest allowed starting time for the stage.
- */
- long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
- int index, ReservationRequest currentReservationStage,
- long stageDeadline);
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
deleted file mode 100644
index 43d6584..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-
-import java.util.ListIterator;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
-
-/**
- * Sets the earliest start time of a stage proportional to the job weight. The
- * interval [jobArrival, stageDeadline) is divided as follows. First, each stage
- * is guaranteed at least its requested duration. Then, the stage receives a
- * fraction of the remaining time. The fraction is calculated as the ratio
- * between the weight (total requested resources) of the stage and the total
- * weight of all proceeding stages.
- */
-
-public class StageEarliestStartByDemand implements StageEarliestStart {
-
- private long step;
-
- @Override
- public long setEarliestStartTime(Plan plan,
- ReservationDefinition reservation, int index, ReservationRequest current,
- long stageDeadline) {
-
- step = plan.getStep();
-
- // If this is the first stage, don't bother with the computation.
- if (index < 1) {
- return reservation.getArrival();
- }
-
- // Get iterator
- ListIterator<ReservationRequest> li =
- reservation.getReservationRequests().getReservationResources()
- .listIterator(index);
- ReservationRequest rr;
-
- // Calculate the total weight & total duration
- double totalWeight = calcWeight(current);
- long totalDuration = getRoundedDuration(current, plan);
-
- while (li.hasPrevious()) {
- rr = li.previous();
- totalWeight += calcWeight(rr);
- totalDuration += getRoundedDuration(rr, plan);
- }
-
- // Compute the weight of the current stage as compared to remaining ones
- double ratio = calcWeight(current) / totalWeight;
-
- // Estimate an early start time, such that:
- // 1. Every stage is guaranteed to receive at least its duration
- // 2. The remainder of the window is divided between stages
- // proportionally to its workload (total memory consumption)
- long window = stageDeadline - reservation.getArrival();
- long windowRemainder = window - totalDuration;
- long earlyStart =
- (long) (stageDeadline - getRoundedDuration(current, plan)
- - (windowRemainder * ratio));
-
- // Realign if necessary (since we did some arithmetic)
- earlyStart = stepRoundUp(earlyStart, step);
-
- // Return
- return earlyStart;
-
- }
-
- // Weight = total memory consumption of stage
- protected double calcWeight(ReservationRequest stage) {
- return (stage.getDuration() * stage.getCapability().getMemorySize())
- * (stage.getNumContainers());
- }
-
- protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
- return stepRoundUp(stage.getDuration(), step);
- }
-
- protected static long stepRoundDown(long t, long step) {
- return (t / step) * step;
- }
-
- protected static long stepRoundUp(long t, long step) {
- return ((t + step - 1) / step) * step;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
deleted file mode 100644
index 8347816..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
-
-/**
- * Sets the earliest start time of a stage as the job arrival time.
- */
-public class StageEarliestStartByJobArrival implements StageEarliestStart {
-
- @Override
- public long setEarliestStartTime(Plan plan,
- ReservationDefinition reservation, int index, ReservationRequest current,
- long stageDeadline) {
-
- return reservation.getArrival();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
new file mode 100644
index 0000000..8f7f5f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * An auxiliary class used to compute the time interval in which the stage can
+ * be allocated resources by {@link IterativePlanner}.
+ */
+public interface StageExecutionInterval {
+ /**
+ * Computes the earliest allowed starting time for a given stage.
+ *
+ * @param plan the Plan to which the reservation must be fitted
+ * @param reservation the job contract
+ * @param currentReservationStage the stage
+ * @param allocateLeft is the job allocated from left to right
+ * @param allocations Existing resource assignments for the job
+ * @return the time interval in which the stage can get resources.
+ */
+ ReservationInterval computeExecutionInterval(Plan plan,
+ ReservationDefinition reservation,
+ ReservationRequest currentReservationStage, boolean allocateLeft,
+ RLESparseResourceAllocation allocations);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
new file mode 100644
index 0000000..95f1d4b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider;
+
+/**
+ * An implementation of {@link StageExecutionInterval}, which sets the execution
+ * interval of the stage. For ANY and ALL jobs, the interval is
+ * [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time
+ * interval is divided as follows: First, each stage is guaranteed at least its
+ * requested duration. Then, the stage receives a fraction of the remaining
+ * time. The fraction is calculated as the ratio between the weight (total
+ * requested resources) of the stage and the total weight of all remaining
+ * stages.
+ */
+
+public class StageExecutionIntervalByDemand implements StageExecutionInterval {
+
+ private long step;
+
+ @Override
+ public ReservationInterval computeExecutionInterval(Plan plan,
+ ReservationDefinition reservation,
+ ReservationRequest currentReservationStage, boolean allocateLeft,
+ RLESparseResourceAllocation allocations) {
+
+ // Use StageExecutionIntervalUnconstrained to get the maximal interval
+ ReservationInterval maxInterval =
+ (new StageExecutionIntervalUnconstrained()).computeExecutionInterval(
+ plan, reservation, currentReservationStage, allocateLeft,
+ allocations);
+
+ ReservationRequestInterpreter jobType =
+ reservation.getReservationRequests().getInterpreter();
+
+ // For unconstrained jobs, such as ALL & ANY, we can use the unconstrained
+ // version
+ if ((jobType != ReservationRequestInterpreter.R_ORDER)
+ && (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
+ return maxInterval;
+ }
+
+ // For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval
+ step = plan.getStep();
+
+ double totalWeight = 0.0;
+ long totalDuration = 0;
+
+ // Iterate over the stages that haven't been allocated.
+ // For allocateLeft == True, we iterate in reverse order, starting from the
+ // last
+ // stage, until we reach the current stage.
+ // For allocateLeft == False, we do the opposite.
+ StageProvider stageProvider = new StageProvider(!allocateLeft, reservation);
+
+ while (stageProvider.hasNext()) {
+ ReservationRequest rr = stageProvider.next();
+ totalWeight += calcWeight(rr);
+ totalDuration += getRoundedDuration(rr, step);
+
+ // Stop once we reach current
+ if (rr == currentReservationStage) {
+ break;
+ }
+ }
+
+ // Compute the weight of the current stage as compared to remaining ones
+ double ratio = calcWeight(currentReservationStage) / totalWeight;
+
+ // Estimate an early start time, such that:
+ // 1. Every stage is guaranteed to receive at least its duration
+ // 2. The remainder of the window is divided between stages
+ // proportionally to its workload (total memory consumption)
+ long maxIntervalArrival = maxInterval.getStartTime();
+ long maxIntervalDeadline = maxInterval.getEndTime();
+ long window = maxIntervalDeadline - maxIntervalArrival;
+ long windowRemainder = window - totalDuration;
+
+ if (allocateLeft) {
+ long latestEnd =
+ (long) (maxIntervalArrival
+ + getRoundedDuration(currentReservationStage, step)
+ + (windowRemainder * ratio));
+
+ // Realign if necessary (since we did some arithmetic)
+ latestEnd = stepRoundDown(latestEnd, step);
+
+ // Return new interval
+ return new ReservationInterval(maxIntervalArrival, latestEnd);
+ } else {
+ long earlyStart =
+ (long) (maxIntervalDeadline
+ - getRoundedDuration(currentReservationStage, step)
+ - (windowRemainder * ratio));
+
+ // Realign if necessary (since we did some arithmetic)
+ earlyStart = stepRoundUp(earlyStart, step);
+
+ // Return new interval
+ return new ReservationInterval(earlyStart, maxIntervalDeadline);
+ }
+ }
+
+ // Weight = total memory consumption of stage
+ protected double calcWeight(ReservationRequest stage) {
+ return (stage.getDuration() * stage.getCapability().getMemorySize())
+ * (stage.getNumContainers());
+ }
+
+ protected long getRoundedDuration(ReservationRequest stage, Long s) {
+ return stepRoundUp(stage.getDuration(), s);
+ }
+
+ protected static long stepRoundDown(long t, long s) {
+ return (t / s) * s;
+ }
+
+ protected static long stepRoundUp(long t, long s) {
+ return ((t + s - 1) / s) * s;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3a615ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
new file mode 100644
index 0000000..cccd9d8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * An implementation of {@link StageExecutionInterval} which gives each stage
+ * the maximal possible time interval, given the job constraints. Specifically,
+ * for ANY and ALL jobs, the interval would be [jobArrival, jobDeadline). For
+ * ORDER jobs, the stage cannot start before its predecessors (if allocateLeft
+ * == true) or cannot end before its successors (if allocateLeft == false)
+ */
+public class StageExecutionIntervalUnconstrained implements
+ StageExecutionInterval {
+
+ @Override
+ public ReservationInterval computeExecutionInterval(Plan plan,
+ ReservationDefinition reservation,
+ ReservationRequest currentReservationStage, boolean allocateLeft,
+ RLESparseResourceAllocation allocations) {
+
+ Long stageArrival = reservation.getArrival();
+ Long stageDeadline = reservation.getDeadline();
+
+ ReservationRequestInterpreter jobType =
+ reservation.getReservationRequests().getInterpreter();
+
+ // Left to right
+ if (allocateLeft) {
+ // If ORDER job, change the stage arrival time
+ if ((jobType == ReservationRequestInterpreter.R_ORDER)
+ || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
+ Long allocationEndTime = allocations.getLatestNonNullTime();
+ if (allocationEndTime != -1) {
+ stageArrival = allocationEndTime;
+ }
+ }
+ // Right to left
+ } else {
+ // If ORDER job, change the stage deadline
+ if ((jobType == ReservationRequestInterpreter.R_ORDER)
+ || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
+ Long allocationStartTime = allocations.getEarliestStartTime();
+ if (allocationStartTime != -1) {
+ stageDeadline = allocationStartTime;
+ }
+ }
+ }
+ return new ReservationInterval(stageArrival, stageDeadline);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org