You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 09:00:10 UTC
[36/50] hadoop git commit: YARN-4360. Improve GreedyReservationAgent
to support "early" allocations,
and performance improvements (curino via asuresh)
YARN-4360. Improve GreedyReservationAgent to support "early" allocations, and performance improvements (curino via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5cf5c41a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5cf5c41a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5cf5c41a
Branch: refs/heads/yarn-2877
Commit: 5cf5c41a895f5ab8bf6270089f8cfdea50573a97
Parents: a429f85
Author: Arun Suresh <as...@apache.org>
Authored: Wed Feb 10 09:11:15 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Feb 10 09:11:15 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reservation/CapacityOverTimePolicy.java | 9 +
.../planning/AlignedPlannerWithGreedy.java | 4 +-
.../planning/GreedyReservationAgent.java | 40 ++-
.../reservation/planning/IterativePlanner.java | 239 ++++++++++++------
.../planning/StageAllocatorGreedyRLE.java | 245 +++++++++++++++++++
.../reservation/ReservationSystemTestUtil.java | 19 +-
.../planning/TestGreedyReservationAgent.java | 120 +++++++--
8 files changed, 565 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3a0e0b1..ff37e1a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -799,6 +799,9 @@ Release 2.8.0 - UNRELEASED
YARN-4662. Document some newly added metrics. (Jian He via xgong)
+ YARN-4360. Improve GreedyReservationAgent to support "early" allocations,
+ and performance improvements (curino via asuresh)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index 424b543..80f6c88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -214,6 +214,15 @@ public class CapacityOverTimePolicy implements SharingPolicy {
RLESparseResourceAllocation used =
plan.getConsumptionForUserOverTime(user, start, end);
+ // add back in old reservation used resources if any
+ ReservationAllocation old = plan.getReservationById(oldId);
+ if (old != null) {
+ used =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ Resources.clone(plan.getTotalCapacity()), used,
+ old.getResourcesOverTime(), RLEOperator.subtract, start, end);
+ }
+
instRLEQuota =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
index a389928..b23cf1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -58,13 +58,13 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
// LowCostAligned planning algorithm
ReservationAgent algAligned =
new IterativePlanner(new StageEarliestStartByDemand(),
- new StageAllocatorLowCostAligned(smoothnessFactor));
+ new StageAllocatorLowCostAligned(smoothnessFactor), false);
listAlg.add(algAligned);
// Greedy planning algorithm
ReservationAgent algGreedy =
new IterativePlanner(new StageEarliestStartByJobArrival(),
- new StageAllocatorGreedy());
+ new StageAllocatorGreedy(), false);
listAlg.add(algGreedy);
// Set planner:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
index db82a66..915a834 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
@@ -45,9 +46,44 @@ public class GreedyReservationAgent implements ReservationAgent {
.getLogger(GreedyReservationAgent.class);
// Greedy planner
- private final ReservationAgent planner = new IterativePlanner(
- new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
+ private final ReservationAgent planner;
+ public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
+ "yarn.resourcemanager.reservation-system.favor-early-allocation";
+
+ public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
+
+ private final boolean allocateLeft;
+
+ public GreedyReservationAgent() {
+ this(new Configuration());
+ }
+
+ public GreedyReservationAgent(Configuration yarnConfiguration) {
+
+ allocateLeft =
+ yarnConfiguration.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION,
+ DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
+
+ if (allocateLeft) {
+ LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
+ + " (left) allocations (controlled by parameter: "
+ + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+ } else {
+ LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
+ + " (right) allocations (controlled by parameter: "
+ + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+ }
+
+ planner =
+ new IterativePlanner(new StageEarliestStartByJobArrival(),
+ new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
+
+ }
+
+ public boolean isAllocateLeft(){
+ return allocateLeft;
+ }
@Override
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index 77362d5..24d237a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -69,11 +72,13 @@ public class IterativePlanner extends PlanningAlgorithm {
// Phase algorithms
private StageEarliestStart algStageEarliestStart = null;
private StageAllocator algStageAllocator = null;
+ private final boolean allocateLeft;
// Constructor
public IterativePlanner(StageEarliestStart algEarliestStartTime,
- StageAllocator algStageAllocator) {
+ StageAllocator algStageAllocator, boolean allocateLeft) {
+ this.allocateLeft = allocateLeft;
setAlgStageEarliestStart(algEarliestStartTime);
setAlgStageAllocator(algStageAllocator);
@@ -85,61 +90,49 @@ public class IterativePlanner extends PlanningAlgorithm {
String user) throws PlanningException {
// Initialize
- initialize(plan, reservation);
-
- // If the job has been previously reserved, logically remove its allocation
- ReservationAllocation oldReservation =
- plan.getReservationById(reservationId);
- if (oldReservation != null) {
- ignoreOldAllocation(oldReservation);
- }
+ initialize(plan, reservationId, reservation);
// Create the allocations data structure
RLESparseResourceAllocation allocations =
new RLESparseResourceAllocation(plan.getResourceCalculator());
- // Get a reverse iterator for the set of stages
- ListIterator<ReservationRequest> li =
- reservation
- .getReservationRequests()
- .getReservationResources()
- .listIterator(
- reservation.getReservationRequests().getReservationResources()
- .size());
+ StageProvider stageProvider = new StageProvider(allocateLeft, reservation);
// Current stage
ReservationRequest currentReservationStage;
- // Index, points on the current node
- int index =
- reservation.getReservationRequests().getReservationResources().size();
-
// Stage deadlines
long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
long successorStartingTime = -1;
+ long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
+ long stageArrivalTime = -1;
// Iterate the stages in reverse order
- while (li.hasPrevious()) {
+ while (stageProvider.hasNext()) {
// Get current stage
- currentReservationStage = li.previous();
- index -= 1;
+ currentReservationStage = stageProvider.next();
// Validate that the ReservationRequest respects basic constraints
validateInputStage(plan, currentReservationStage);
// Compute an adjusted earliestStart for this resource
// (we need this to provision some space for the ORDER contracts)
- long stageArrivalTime = reservation.getArrival();
- if (jobType == ReservationRequestInterpreter.R_ORDER
- || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
- stageArrivalTime =
- computeEarliestStartingTime(plan, reservation, index,
- currentReservationStage, stageDeadline);
- }
- stageArrivalTime = stepRoundUp(stageArrivalTime, step);
- stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+ if (allocateLeft) {
+ stageArrivalTime = predecessorEndTime;
+ } else {
+ stageArrivalTime = reservation.getArrival();
+ if (jobType == ReservationRequestInterpreter.R_ORDER
+ || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+ stageArrivalTime =
+ computeEarliestStartingTime(plan, reservation,
+ stageProvider.getCurrentIndex(), currentReservationStage,
+ stageDeadline);
+ }
+ stageArrivalTime = stepRoundUp(stageArrivalTime, step);
+ stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+ }
// Compute the allocation of a single stage
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage,
@@ -155,7 +148,7 @@ public class IterativePlanner extends PlanningAlgorithm {
}
// Otherwise, the job cannot be allocated
- return null;
+ throw new PlanningException("The request cannot be satisfied");
}
@@ -177,33 +170,41 @@ public class IterativePlanner extends PlanningAlgorithm {
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+ // CHECK ORDER_NO_GAP
// Verify that there is no gap, in case the job is ORDER_NO_GAP
+ // note that the test is different left-to-right and right-to-left
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& successorStartingTime != -1
- && successorStartingTime > stageEndTime) {
-
- return null;
-
+ && ((allocateLeft && predecessorEndTime < stageStartTime) ||
+ (!allocateLeft && (stageEndTime < successorStartingTime))
+ )
+ || (!isNonPreemptiveAllocation(curAlloc))) {
+ throw new PlanningException(
+ "The allocation found does not respect ORDER_NO_GAP");
}
- // Store the stageStartTime and set the new stageDeadline
- successorStartingTime = stageStartTime;
- stageDeadline = stageStartTime;
-
+ if (allocateLeft) {
+ // Store the stageStartTime and set the new stageDeadline
+ predecessorEndTime = stageEndTime;
+ } else {
+ // Store the stageStartTime and set the new stageDeadline
+ successorStartingTime = stageStartTime;
+ stageDeadline = stageStartTime;
+ }
}
-
}
// If the allocation is empty, return an error
if (allocations.isEmpty()) {
- return null;
+ throw new PlanningException("The request cannot be satisfied");
}
return allocations;
}
- protected void initialize(Plan plan, ReservationDefinition reservation) {
+ protected void initialize(Plan plan, ReservationId reservationId,
+ ReservationDefinition reservation) throws PlanningException {
// Get plan step & capacity
capacity = plan.getTotalCapacity();
@@ -214,13 +215,26 @@ public class IterativePlanner extends PlanningAlgorithm {
jobArrival = stepRoundUp(reservation.getArrival(), step);
jobDeadline = stepRoundDown(reservation.getDeadline(), step);
- // Dirty read of plan load
- planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
-
// Initialize the plan modifications
planModifications =
new RLESparseResourceAllocation(plan.getResourceCalculator());
+ // Dirty read of plan load
+
+ // planLoads are not used by other StageAllocators... and don't deal
+ // well with huge reservation ranges
+ if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) {
+ planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
+ ReservationAllocation oldRes = plan.getReservationById(reservationId);
+ if (oldRes != null) {
+ planModifications =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), planModifications,
+ oldRes.getResourcesOverTime(), RLEOperator.subtract,
+ jobArrival, jobDeadline);
+ }
+ }
+
}
private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
@@ -240,32 +254,6 @@ public class IterativePlanner extends PlanningAlgorithm {
}
- private void ignoreOldAllocation(ReservationAllocation oldReservation) {
-
- // If there is no old reservation, return
- if (oldReservation == null) {
- return;
- }
-
- // Subtract each allocation interval from the planModifications
- for (Entry<ReservationInterval, Resource> entry : oldReservation
- .getAllocationRequests().entrySet()) {
-
- // Read the entry
- ReservationInterval interval = entry.getKey();
- Resource resource = entry.getValue();
-
- // Find the actual request
- Resource negativeResource = Resources.multiply(resource, -1);
-
- // Insert it into planModifications as a 'negative' request, to
- // represent available resources
- planModifications.addInterval(interval, negativeResource);
-
- }
-
- }
-
private void validateInputStage(Plan plan, ReservationRequest rr)
throws ContractValidationException {
@@ -291,13 +279,56 @@ public class IterativePlanner extends PlanningAlgorithm {
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException(
- "Individual capability requests should not exceed cluster's " +
- "maxAlloc");
+ "Individual capability requests should not exceed cluster's "
+ + "maxAlloc");
}
}
+ private boolean isNonPreemptiveAllocation(
+ Map<ReservationInterval, Resource> curAlloc) {
+
+ // Checks whether a stage allocation is non preemptive or not.
+ // Assumption: the intervals are non-intersecting (as returned by
+ // computeStageAllocation()).
+ // For a non-preemptive allocation, only two end points appear exactly once
+
+ Set<Long> endPoints = new HashSet<Long>(2 * curAlloc.size());
+ for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
+
+ ReservationInterval interval = entry.getKey();
+ Resource resource = entry.getValue();
+
+ // Ignore intervals with no allocation
+ if (Resources.equals(resource, Resource.newInstance(0, 0))) {
+ continue;
+ }
+
+ // Get endpoints
+ Long left = interval.getStartTime();
+ Long right = interval.getEndTime();
+
+ // Add left endpoint if we haven't seen it before, remove otherwise
+ if (!endPoints.contains(left)) {
+ endPoints.add(left);
+ } else {
+ endPoints.remove(left);
+ }
+
+ // Add right endpoint if we haven't seen it before, remove otherwise
+ if (!endPoints.contains(right)) {
+ endPoints.add(right);
+ } else {
+ endPoints.remove(right);
+ }
+ }
+
+ // Non-preemptive only if endPoints is of size 2
+ return (endPoints.size() == 2);
+
+ }
+
// Call algEarliestStartTime()
protected long computeEarliestStartingTime(Plan plan,
ReservationDefinition reservation, int index,
@@ -335,4 +366,60 @@ public class IterativePlanner extends PlanningAlgorithm {
}
+ /**
+ * Helper class that provide a list of ReservationRequests and iterates
+ * forward or backward depending whether we are allocating left-to-right or
+ * right-to-left.
+ */
+ public static class StageProvider {
+
+ private final boolean allocateLeft;
+
+ private ListIterator<ReservationRequest> li;
+
+ public StageProvider(boolean allocateLeft,
+ ReservationDefinition reservation) {
+
+ this.allocateLeft = allocateLeft;
+ int startingIndex;
+ if (allocateLeft) {
+ startingIndex = 0;
+ } else {
+ startingIndex =
+ reservation.getReservationRequests().getReservationResources()
+ .size();
+ }
+ // Get a reverse iterator for the set of stages
+ li =
+ reservation.getReservationRequests().getReservationResources()
+ .listIterator(startingIndex);
+
+ }
+
+ public boolean hasNext() {
+ if (allocateLeft) {
+ return li.hasNext();
+ } else {
+ return li.hasPrevious();
+ }
+ }
+
+ public ReservationRequest next() {
+ if (allocateLeft) {
+ return li.next();
+ } else {
+ return li.previous();
+ }
+ }
+
+ public int getCurrentIndex() {
+ if (allocateLeft) {
+ return li.nextIndex() - 1;
+ } else {
+ return li.previousIndex() + 1;
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
new file mode 100644
index 0000000..c5a3192
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Computes the stage allocation according to the greedy allocation rule. The
+ * greedy rule repeatedly allocates requested containers at the leftmost or
+ * rightmost possible interval. This implementation leverages the
+ * run-length-encoding of the time-series we operate on and proceed more quickly
+ * than the baseline.
+ */
+
+public class StageAllocatorGreedyRLE implements StageAllocator {
+
+ private final boolean allocateLeft;
+
+ public StageAllocatorGreedyRLE(boolean allocateLeft) {
+ this.allocateLeft = allocateLeft;
+ }
+
+ @Override
+ public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+ Map<Long, Resource> planLoads,
+ RLESparseResourceAllocation planModifications, ReservationRequest rr,
+ long stageEarliestStart, long stageDeadline, String user,
+ ReservationId oldId) throws PlanningException {
+
+ // abort early if the interval is not satisfiable
+ if (stageEarliestStart + rr.getDuration() > stageDeadline) {
+ return null;
+ }
+
+ Map<ReservationInterval, Resource> allocationRequests =
+ new HashMap<ReservationInterval, Resource>();
+
+ Resource totalCapacity = plan.getTotalCapacity();
+
+ // compute the gang as a resource and get the duration
+ Resource sizeOfGang =
+ Resources.multiply(rr.getCapability(), rr.getConcurrency());
+ long dur = rr.getDuration();
+ long step = plan.getStep();
+
+ // ceil the duration to the next multiple of the plan step
+ if (dur % step != 0) {
+ dur += (step - (dur % step));
+ }
+
+ // we know for sure that this division has no remainder (part of contract
+ // with user, validate before
+ int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+ // get available resources from plan
+ RLESparseResourceAllocation netRLERes =
+ plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
+ stageDeadline);
+
+ // remove plan modifications
+ netRLERes =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ totalCapacity, netRLERes, planModifications, RLEOperator.subtract,
+ stageEarliestStart, stageDeadline);
+
+ // loop trying to place until we are done, or we are considering
+ // an invalid range of times
+ while (gangsToPlace > 0 && stageEarliestStart + dur <= stageDeadline) {
+
+ // as we run along we remember how many gangs we can fit, and what
+ // was the most constraining moment in time (we will restart just
+ // after that to place the next batch)
+ int maxGang = gangsToPlace;
+ long minPoint = -1;
+
+ // focus our attention to a time-range under consideration
+ NavigableMap<Long, Resource> partialMap =
+ netRLERes.getRangeOverlapping(stageEarliestStart, stageDeadline)
+ .getCumulative();
+
+ // revert the map for right-to-left allocation
+ if (!allocateLeft) {
+ partialMap = partialMap.descendingMap();
+ }
+
+ Iterator<Entry<Long, Resource>> netIt = partialMap.entrySet().iterator();
+
+ long oldT = stageDeadline;
+
+ // internal loop, tries to allocate as many gang as possible starting
+ // at a given point in time, if it fails we move to the next time
+ // interval (with outside loop)
+ while (maxGang > 0 && netIt.hasNext()) {
+
+ long t;
+ Resource curAvailRes;
+
+ Entry<Long, Resource> e = netIt.next();
+ if (allocateLeft) {
+ t = Math.max(e.getKey(), stageEarliestStart);
+ curAvailRes = e.getValue();
+ } else {
+ t = oldT;
+ oldT = e.getKey();
+ //attention: higher means lower, because we reversed the map direction
+ curAvailRes = partialMap.higherEntry(t).getValue();
+ }
+
+ // check exit/skip conditions/
+ if (curAvailRes == null) {
+ //skip undefined regions (should not happen beside borders)
+ continue;
+ }
+ if (exitCondition(t, stageEarliestStart, stageDeadline, dur)) {
+ break;
+ }
+
+ // compute maximum number of gangs we could fit
+ int curMaxGang =
+ (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+ totalCapacity, curAvailRes, sizeOfGang));
+ curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+ // compare with previous max, and set it. also remember *where* we found
+ // the minimum (useful for next attempts)
+ if (curMaxGang <= maxGang) {
+ maxGang = curMaxGang;
+ minPoint = t;
+ }
+ }
+
+ // update data structures that retain the progress made so far
+ gangsToPlace =
+ trackProgress(planModifications, rr, stageEarliestStart,
+ stageDeadline, allocationRequests, dur, gangsToPlace, maxGang);
+
+ // reset the next range of time-intervals to deal with
+ if (allocateLeft) {
+ // set earliest start to the min of the constraining "range" or my the
+ // end of this allocation
+ stageEarliestStart =
+ Math.min(partialMap.higherKey(minPoint), stageEarliestStart + dur);
+ } else {
+ // same as above moving right-to-left
+ stageDeadline =
+ Math.max(partialMap.higherKey(minPoint), stageDeadline - dur);
+ }
+ }
+
+ // if no gangs are left to place we succeed and return the allocation
+ if (gangsToPlace == 0) {
+ return allocationRequests;
+ } else {
+ // If we are here is because we did not manage to satisfy this request.
+ // So we need to remove unwanted side-effect from tempAssigned (needed
+ // for ANY).
+ for (Map.Entry<ReservationInterval, Resource> tempAllocation :
+ allocationRequests.entrySet()) {
+ planModifications.removeInterval(tempAllocation.getKey(),
+ tempAllocation.getValue());
+ }
+ // and return null to signal failure in this allocation
+ return null;
+ }
+
+ }
+
+ private int trackProgress(RLESparseResourceAllocation planModifications,
+ ReservationRequest rr, long stageEarliestStart, long stageDeadline,
+ Map<ReservationInterval, Resource> allocationRequests, long dur,
+ int gangsToPlace, int maxGang) {
+ // if we were able to place any gang, record this, and decrement
+ // gangsToPlace
+ if (maxGang > 0) {
+ gangsToPlace -= maxGang;
+
+ ReservationInterval reservationInt =
+ computeReservationInterval(stageEarliestStart, stageDeadline, dur);
+ Resource reservationRes =
+ Resources.multiply(rr.getCapability(), rr.getConcurrency() * maxGang);
+ // remember occupied space (plan is read-only till we find a plausible
+ // allocation for the entire request). This is needed since we might be
+ // placing other ReservationRequest within the same
+ // ReservationDefinition,
+ // and we must avoid double-counting the available resources
+ planModifications.addInterval(reservationInt, reservationRes);
+ allocationRequests.put(reservationInt, reservationRes);
+
+ }
+ return gangsToPlace;
+ }
+
+ private ReservationInterval computeReservationInterval(
+ long stageEarliestStart, long stageDeadline, long dur) {
+ ReservationInterval reservationInt;
+ if (allocateLeft) {
+ reservationInt =
+ new ReservationInterval(stageEarliestStart, stageEarliestStart + dur);
+ } else {
+ reservationInt =
+ new ReservationInterval(stageDeadline - dur, stageDeadline);
+ }
+ return reservationInt;
+ }
+
+
+ private boolean exitCondition(long t, long stageEarliestStart,
+ long stageDeadline, long dur) {
+ if (allocateLeft) {
+ return t >= stageEarliestStart + dur;
+ } else {
+ return t < stageDeadline - dur;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 0aedc6a..4aef7ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anySetOf;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import java.io.FileWriter;
import java.io.IOException;
@@ -75,12 +73,14 @@ public class ReservationSystemTestUtil {
public static ReservationSchedulerConfiguration createConf(
String reservationQ, long timeWindow, float instConstraint,
float avgConstraint) {
- ReservationSchedulerConfiguration conf =
- mock(ReservationSchedulerConfiguration.class);
+
+ ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
+ ReservationSchedulerConfiguration conf = spy(realConf);
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
when(conf.getInstantaneousMaxCapacity(reservationQ))
.thenReturn(instConstraint);
when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint);
+
return conf;
}
@@ -177,10 +177,15 @@ public class ReservationSystemTestUtil {
public static ReservationDefinition createSimpleReservationDefinition(
long arrival, long deadline, long duration) {
+ return createSimpleReservationDefinition(arrival, deadline, duration, 1);
+ }
+
+ public static ReservationDefinition createSimpleReservationDefinition(
+ long arrival, long deadline, long duration, int parallelism) {
// create a request with a single atomic ask
ReservationRequest r =
- ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
- duration);
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ parallelism, parallelism, duration);
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cf5c41a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
index f81e7ec..b8a618b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -55,8 +57,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.mortbay.log.Log;
+@RunWith(Parameterized.class)
public class TestGreedyReservationAgent {
ReservationAgent agent;
@@ -66,6 +72,17 @@ public class TestGreedyReservationAgent {
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
Random rand = new Random();
long step;
+ boolean allocateLeft;
+
+ public TestGreedyReservationAgent(Boolean b){
+ this.allocateLeft = b;
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}});
+ }
@Before
public void setup() throws Exception {
@@ -90,7 +107,11 @@ public class TestGreedyReservationAgent {
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
- agent = new GreedyReservationAgent();
+ // setting conf to
+ conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION,
+ allocateLeft);
+
+ agent = new GreedyReservationAgent(conf);
QueueMetrics queueMetrics = mock(QueueMetrics.class);
RMContext context = ReservationSystemTestUtil.createMockRMContext();
@@ -130,13 +151,21 @@ public class TestGreedyReservationAgent {
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
- for (long i = 10 * step; i < 20 * step; i++) {
- assertTrue(
- "Agent-based allocation unexpected",
- Resources.equals(cs.getResourcesAtTime(i),
- Resource.newInstance(2048 * 10, 2 * 10)));
+ if(allocateLeft){
+ for (long i = 5 * step; i < 15 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 10, 2 * 10)));
+ }
+ } else {
+ for (long i = 10 * step; i < 20 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 10, 2 * 10)));
+ }
}
-
}
@SuppressWarnings("javadoc")
@@ -212,18 +241,33 @@ public class TestGreedyReservationAgent {
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
- for (long i = 90 * step; i < 100 * step; i++) {
- assertTrue(
- "Agent-based allocation unexpected",
- Resources.equals(cs.getResourcesAtTime(i),
- Resource.newInstance(2048 * 20, 2 * 20)));
- }
- // RR2 is pushed out by the presence of RR
- for (long i = 80 * step; i < 90 * step; i++) {
- assertTrue(
- "Agent-based allocation unexpected",
- Resources.equals(cs2.getResourcesAtTime(i),
- Resource.newInstance(2048 * 20, 2 * 20)));
+ if (allocateLeft) {
+ for (long i = 5 * step; i < 15 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 20, 2 * 20)));
+ }
+ for (long i = 15 * step; i < 25 * step; i++) {
+ // RR2 is pushed out by the presence of RR
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs2.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 20, 2 * 20)));
+ }
+ } else {
+ for (long i = 90 * step; i < 100 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 20, 2 * 20)));
+ }
+ for (long i = 80 * step; i < 90 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs2.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 20, 2 * 20)));
+ }
}
}
@@ -274,10 +318,18 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
- assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
- assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+ if (allocateLeft) {
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 32 * step, 42 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 42 * step, 62 * step, 10, 1024, 1));
+
+ } else {
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+ }
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
@@ -466,7 +518,12 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
- assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+ if (allocateLeft) {
+ assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 5, 1024, 1));
+ } else {
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+ }
+
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());
@@ -551,8 +608,13 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
- assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
- assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+ if (allocateLeft) {
+ assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 25, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+ } else {
+ assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+ }
System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ ")----------");
@@ -695,14 +757,18 @@ public class TestGreedyReservationAgent {
public static void main(String[] arg) {
+ boolean left = false;
// run a stress test with by default 1000 random jobs
int numJobs = 1000;
if (arg.length > 0) {
numJobs = Integer.parseInt(arg[0]);
}
+ if (arg.length > 1) {
+ left = Boolean.parseBoolean(arg[1]);
+ }
try {
- TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+ TestGreedyReservationAgent test = new TestGreedyReservationAgent(left);
test.setup();
test.testStress(numJobs);
} catch (Exception e) {