You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2015/12/06 08:13:40 UTC
[38/38] hadoop git commit: YARN-4358. Reservation System: Improve
relationship between SharingPolicy and ReservationAgent. (Carlo Curino via
asuresh)
YARN-4358. Reservation System: Improve relationship between SharingPolicy and ReservationAgent. (Carlo Curino via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/742632e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/742632e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/742632e3
Branch: refs/heads/yarn-2877
Commit: 742632e346604fd2b263bd42367165638fcf2416
Parents: 42d4901
Author: Arun Suresh <as...@apache.org>
Authored: Sat Dec 5 21:26:16 2015 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Sat Dec 5 21:26:16 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reservation/CapacityOverTimePolicy.java | 52 +++++++-
.../reservation/InMemoryPlan.java | 123 ++++++++++++++++++-
.../InMemoryReservationAllocation.java | 13 +-
.../reservation/NoOverCommitPolicy.java | 8 ++
.../resourcemanager/reservation/PlanView.java | 65 ++++++++--
.../reservation/ReservationAllocation.java | 12 +-
.../reservation/SharingPolicy.java | 24 +++-
.../reservation/planning/IterativePlanner.java | 16 +--
.../reservation/planning/PlanningAlgorithm.java | 34 ++---
.../reservation/planning/StageAllocator.java | 6 +-
.../planning/StageAllocatorGreedy.java | 23 ++--
.../planning/StageAllocatorLowCostAligned.java | 14 ++-
.../reservation/TestInMemoryPlan.java | 72 ++++++-----
.../planning/TestGreedyReservationAgent.java | 94 +++++++++++++-
15 files changed, 464 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 90ada4b..1fed6a6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -592,6 +592,9 @@ Release 2.8.0 - UNRELEASED
YARN-4405. Support node label store in non-appendable file system. (Wangda
Tan via jianhe)
+ YARN-4358. Reservation System: Improve relationship between SharingPolicy
+ and ReservationAgent. (Carlo Curino via asuresh)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
index afba7ea..424b543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -18,10 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Date;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
@@ -104,14 +108,17 @@ public class CapacityOverTimePolicy implements SharingPolicy {
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
maxAllowed.multiplyBy(validWindow / step);
+ RLESparseResourceAllocation userCons =
+ plan.getConsumptionForUserOverTime(reservation.getUser(), startTime
+ - validWindow, endTime + validWindow);
+
// check that the resources offered to the user during any window of length
// "validWindow" overlapping this allocation are within maxAllowed
// also enforce instantaneous and physical constraints during this pass
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
- Resource currExistingAllocForUser =
- plan.getConsumptionForUser(reservation.getUser(), t);
+ Resource currExistingAllocForUser = userCons.getCapacityAtTime(t);
Resource currNewAlloc = reservation.getResourcesAtTime(t);
Resource currOldAlloc = Resources.none();
if (oldReservation != null) {
@@ -163,8 +170,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
// expire contributions from instant in time before (t - validWindow)
if (t > startTime) {
- Resource pastOldAlloc =
- plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
+ Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
@@ -189,6 +195,39 @@ public class CapacityOverTimePolicy implements SharingPolicy {
}
@Override
+ public RLESparseResourceAllocation availableResources(
+ RLESparseResourceAllocation available, Plan plan, String user,
+ ReservationId oldId, long start, long end) throws PlanningException {
+
+ // this only propagates the instantaneous maxInst properties, while
+ // the time-varying one depends on the current allocation as well
+ // and are not easily captured here
+ Resource planTotalCapacity = plan.getTotalCapacity();
+ Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
+ NavigableMap<Long, Resource> instQuota = new TreeMap<Long, Resource>();
+ instQuota.put(start, maxInsRes);
+
+ RLESparseResourceAllocation instRLEQuota =
+ new RLESparseResourceAllocation(instQuota,
+ plan.getResourceCalculator());
+
+ RLESparseResourceAllocation used =
+ plan.getConsumptionForUserOverTime(user, start, end);
+
+ instRLEQuota =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
+ end);
+
+ instRLEQuota =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ planTotalCapacity, available, instRLEQuota, RLEOperator.min, start,
+ end);
+
+ return instRLEQuota;
+ }
+
+ @Override
public long getValidWindow() {
return validWindow;
}
@@ -198,7 +237,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
* long(s), as using Resource to store the "integral" of the allocation over
* time leads to integer overflows for large allocations/clusters. (Evolving
* Resource to use long is too disruptive at this point.)
- *
+ *
* The comparison/multiplication behaviors of IntegralResource are consistent
* with the DefaultResourceCalculator.
*/
@@ -244,4 +283,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
return "<memory:" + memory + ", vCores:" + vcores + ">";
}
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index af42df9..c51c3ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -27,11 +27,13 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@@ -65,6 +67,9 @@ public class InMemoryPlan implements Plan {
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
+ private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
+ new HashMap<String, RLESparseResourceAllocation>();
+
private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
new HashMap<ReservationId, InMemoryReservationAllocation>();
@@ -121,6 +126,7 @@ public class InMemoryPlan implements Plan {
return queueMetrics;
}
+
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, Resource> allocationRequests =
@@ -132,11 +138,27 @@ public class InMemoryPlan implements Plan {
resAlloc = new RLESparseResourceAllocation(resCalc);
userResourceAlloc.put(user, resAlloc);
}
+ RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
+ if (resCount == null) {
+ resCount = new RLESparseResourceAllocation(resCalc);
+ userActiveReservationCount.put(user, resCount);
+ }
+
+ long earliestActive = Long.MAX_VALUE;
+ long latestActive = Long.MIN_VALUE;
+
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.addInterval(r.getKey(), r.getValue());
rleSparseVector.addInterval(r.getKey(), r.getValue());
+ if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+ ZERO_RESOURCE)) {
+ earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+ latestActive = Math.max(latestActive, r.getKey().getEndTime());
+ }
}
+ resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
+ Resource.newInstance(1, 1));
}
private void decrementAllocation(ReservationAllocation reservation) {
@@ -145,14 +167,29 @@ public class InMemoryPlan implements Plan {
reservation.getAllocationRequests();
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+
+ long earliestActive = Long.MAX_VALUE;
+ long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.removeInterval(r.getKey(), r.getValue());
rleSparseVector.removeInterval(r.getKey(), r.getValue());
+ if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+ ZERO_RESOURCE)) {
+ earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+ latestActive = Math.max(latestActive, r.getKey().getEndTime());
+ }
}
if (resAlloc.isEmpty()) {
userResourceAlloc.remove(user);
}
+
+ RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
+ resCount.removeInterval(new ReservationInterval(earliestActive,
+ latestActive), Resource.newInstance(1, 1));
+ if (resCount.isEmpty()) {
+ userActiveReservationCount.remove(user);
+ }
}
public Set<ReservationAllocation> getAllReservations() {
@@ -160,9 +197,9 @@ public class InMemoryPlan implements Plan {
try {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
- new HashSet<ReservationAllocation>();
- for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
- .values()) {
+ new TreeSet<ReservationAllocation>();
+ for (Set<InMemoryReservationAllocation> reservationEntries :
+ currentReservations.values()) {
flattenedReservations.addAll(reservationEntries);
}
return flattenedReservations;
@@ -417,14 +454,34 @@ public class InMemoryPlan implements Plan {
}
@Override
- public Resource getConsumptionForUser(String user, long t) {
+ public RLESparseResourceAllocation getReservationCountForUserOverTime(
+ String user, long start, long end) {
+ readLock.lock();
+ try {
+ RLESparseResourceAllocation userResAlloc =
+ userActiveReservationCount.get(user);
+
+ if (userResAlloc != null) {
+ return userResAlloc.getRangeOverlapping(start, end);
+ } else {
+ return new RLESparseResourceAllocation(resCalc);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+ long start, long end) {
readLock.lock();
try {
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+
if (userResAlloc != null) {
- return userResAlloc.getCapacityAtTime(t);
+ return userResAlloc.getRangeOverlapping(start, end);
} else {
- return Resources.clone(ZERO_RESOURCE);
+ return new RLESparseResourceAllocation(resCalc);
}
} finally {
readLock.unlock();
@@ -465,6 +522,43 @@ public class InMemoryPlan implements Plan {
}
@Override
+ public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+ ReservationId oldId, long start, long end) throws PlanningException {
+ readLock.lock();
+ try {
+ // create RLE of totCapacity
+ TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
+ totAvailable.put(start, Resources.clone(totalCapacity));
+ RLESparseResourceAllocation totRLEAvail =
+ new RLESparseResourceAllocation(totAvailable, resCalc);
+
+ // subtract used from available
+ RLESparseResourceAllocation netAvailable;
+
+ netAvailable =
+ RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
+ RLEOperator.subtractTestNonNegative, start, end);
+
+ // add back in old reservation used resources if any
+ ReservationAllocation old = reservationTable.get(oldId);
+ if (old != null) {
+ netAvailable =
+ RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), netAvailable,
+ old.getResourcesOverTime(), RLEOperator.add, start, end);
+ }
+ // lower it if this is needed by the sharing policy
+ netAvailable =
+ getSharingPolicy().availableResources(netAvailable, this, user,
+ oldId, start, end);
+ return netAvailable;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
public Resource getMinimumAllocation() {
return Resources.clone(minAlloc);
}
@@ -549,4 +643,21 @@ public class InMemoryPlan implements Plan {
}
}
+ @Override
+ public Set<ReservationAllocation> getReservationByUserAtTime(String user,
+ long t) {
+ readLock.lock();
+ try {
+ Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>();
+ for (ReservationAllocation ra : getReservationsAtTime(t)) {
+ String resUser = ra.getUser();
+ if (resUser != null && resUser.equals(user)) {
+ resSet.add(ra);
+ }
+ }
+ return resSet;
+ } finally {
+ readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index 55ab066..69fd43f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -133,11 +133,16 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
}
@Override
+ public RLESparseResourceAllocation getResourcesOverTime(){
+ return resourcesOverTime;
+ }
+
+ @Override
public String toString() {
StringBuilder sBuf = new StringBuilder();
sBuf.append(getReservationId()).append(" user:").append(getUser())
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
- .append(getEndTime()).append(" alloc:[")
+ .append(getEndTime()).append(" alloc:\n[")
.append(resourcesOverTime.toString()).append("] ");
return sBuf.toString();
}
@@ -151,6 +156,12 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
return 1;
}
+ if (this.getReservationId().getId() > other.getReservationId().getId()) {
+ return -1;
+ }
+ if (this.getReservationId().getId() < other.getReservationId().getId()) {
+ return 1;
+ }
return 0;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index f87e9dc..119520b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@@ -89,4 +90,11 @@ public class NoOverCommitPolicy implements SharingPolicy {
// nothing to do for this policy
}
+ @Override
+ public RLESparseResourceAllocation availableResources(
+ RLESparseResourceAllocation available, Plan plan, String user,
+ ReservationId oldId, long start, long end) throws PlanningException {
+ return available;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 66c66ca..f57c2e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import java.util.Set;
@@ -41,6 +42,17 @@ public interface PlanView extends PlanContext {
public ReservationAllocation getReservationById(ReservationId reservationID);
/**
+ * Return a set of {@link ReservationAllocation} that belongs to a certain
+ * user and overlaps time t.
+ *
+ * @param user the user being considered
+ * @param t the instant in time being considered
+ * @return {@link Set<ReservationAllocation>} for this user at this time
+ */
+ public Set<ReservationAllocation> getReservationByUserAtTime(String user,
+ long t);
+
+ /**
* Gets all the active reservations at the specified point of time
*
* @param tick the time (UTC in ms) for which the active reservations are
@@ -68,18 +80,6 @@ public interface PlanView extends PlanContext {
Resource getTotalCommittedResources(long tick);
/**
- * Returns the total {@link Resource} reserved for a given user at the
- * specified time
- *
- * @param user the user who made the reservation(s)
- * @param tick the time (UTC in ms) for which the reserved resources are
- * requested
- * @return the total {@link Resource} reserved for a given user at the
- * specified time
- */
- public Resource getConsumptionForUser(String user, long tick);
-
- /**
* Returns the overall capacity in terms of {@link Resource} assigned to this
* plan (typically will correspond to the absolute capacity of the
* corresponding queue).
@@ -98,9 +98,48 @@ public interface PlanView extends PlanContext {
/**
* Returns the time (UTC in ms) at which the last reservation terminates
- *
+ *
* @return the time (UTC in ms) at which the last reservation terminates
*/
public long getLastEndTime();
+ /**
+ * This method returns the amount of resources available to a given user
+ * (optionally if removing a certain reservation) over the start-end time
+ * range.
+ *
+ * @param user
+ * @param oldId
+ * @param start
+ * @param end
+ * @return a view of the plan as it is available to this user
+ * @throws PlanningException
+ */
+ public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+ ReservationId oldId, long start, long end) throws PlanningException;
+
+ /**
+ * This method returns a RLE encoded view of the user reservation count
+ * utilization between start and end time.
+ *
+ * @param user
+ * @param start
+ * @param end
+ * @return RLE encoded view of reservation used over time
+ */
+ public RLESparseResourceAllocation getReservationCountForUserOverTime(
+ String user, long start, long end);
+
+ /**
+ * This method returns a RLE encoded view of the user reservation utilization
+ * between start and end time.
+ *
+ * @param user
+ * @param start
+ * @param end
+ * @return RLE encoded view of resources used over time
+ */
+ public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+ long start, long end);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 0d3c692..0da95ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -50,14 +50,14 @@ public interface ReservationAllocation extends
public ReservationDefinition getReservationDefinition();
/**
- * Returns the time at which the reservation is activated
+ * Returns the time at which the reservation is activated.
*
* @return the time at which the reservation is activated
*/
public long getStartTime();
/**
- * Returns the time at which the reservation terminates
+ * Returns the time at which the reservation terminates.
*
* @return the time at which the reservation terminates
*/
@@ -65,7 +65,7 @@ public interface ReservationAllocation extends
/**
* Returns the map of resources requested against the time interval for which
- * they were
+ * they were.
*
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
@@ -118,4 +118,10 @@ public interface ReservationAllocation extends
*/
public Resource getResourcesAtTime(long tick);
+ /**
+ * Return a RLE representation of used resources.
+ * @return a RLE encoding of resources allocated over time.
+ */
+ public RLESparseResourceAllocation getResourcesOverTime();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
index 8f8d24c..e458055 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
@@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
public interface SharingPolicy {
/**
- * Initialize this policy
+ * Initialize this policy.
*
* @param planQueuePath the name of the queue for this plan
* @param conf the system configuration
@@ -54,6 +55,26 @@ public interface SharingPolicy {
throws PlanningException;
/**
+ * This method provide a (partial) instantaneous validation by applying
+ * business rules (such as max number of parallel containers allowed for a
+ * user). To provide the agent with more feedback the returned parameter is
+ * expressed in number of containers that can be fit in this time according to
+ * the business rules.
+ *
+ * @param available the amount of resources that would be offered if not
+ * constrained by the policy
+ * @param plan reference the the current Plan
+ * @param user the username
+ * @param start the start time for the range we are querying
+ * @param end the end time for the range we are querying
+ * @param oldId (optional) the id of a reservation being updated
+ * @throws PlanningException throws if the request is not valid
+ */
+ public RLESparseResourceAllocation availableResources(
+ RLESparseResourceAllocation available, Plan plan, String user,
+ ReservationId oldId, long start, long end) throws PlanningException;
+
+ /**
* Returns the time range before and after the current reservation considered
* by this policy. In particular, this informs the archival process for the
* {@link Plan}, i.e., reservations regarding times before (now - validWindow)
@@ -63,4 +84,5 @@ public interface SharingPolicy {
*/
public long getValidWindow();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index d05b0ef..77362d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResour
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -80,8 +81,8 @@ public class IterativePlanner extends PlanningAlgorithm {
@Override
public RLESparseResourceAllocation computeJobAllocation(Plan plan,
- ReservationId reservationId, ReservationDefinition reservation)
- throws ContractValidationException {
+ ReservationId reservationId, ReservationDefinition reservation,
+ String user) throws PlanningException {
// Initialize
initialize(plan, reservation);
@@ -142,7 +143,7 @@ public class IterativePlanner extends PlanningAlgorithm {
// Compute the allocation of a single stage
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage,
- stageArrivalTime, stageDeadline);
+ stageArrivalTime, stageDeadline, user, reservationId);
// If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue).
@@ -159,8 +160,8 @@ public class IterativePlanner extends PlanningAlgorithm {
}
// Get the start & end time of the current allocation
- Long stageStartTime = findEarliestTime(curAlloc.keySet());
- Long stageEndTime = findLatestTime(curAlloc.keySet());
+ Long stageStartTime = findEarliestTime(curAlloc);
+ Long stageEndTime = findLatestTime(curAlloc);
// If we did find an allocation for the stage, add it
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
@@ -310,10 +311,11 @@ public class IterativePlanner extends PlanningAlgorithm {
// Call algStageAllocator
protected Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, ReservationRequest rr, long stageArrivalTime,
- long stageDeadline) {
+ long stageDeadline, String user, ReservationId oldId)
+ throws PlanningException {
return algStageAllocator.computeStageAllocation(plan, planLoads,
- planModifications, rr, stageArrivalTime, stageDeadline);
+ planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index 8b72b9f..e1b508d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -62,7 +62,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
// Compute the job allocation
RLESparseResourceAllocation allocation =
- computeJobAllocation(plan, reservationId, adjustedContract);
+ computeJobAllocation(plan, reservationId, adjustedContract, user);
// If no job allocation was found, fail
if (allocation == null) {
@@ -84,8 +84,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
adjustedContract, // Contract
user, // User name
plan.getQueueName(), // Queue name
- findEarliestTime(mapAllocations.keySet()), // Earliest start time
- findLatestTime(mapAllocations.keySet()), // Latest end time
+ findEarliestTime(mapAllocations), // Earliest start time
+ findLatestTime(mapAllocations), // Latest end time
mapAllocations, // Allocations
plan.getResourceCalculator(), // Resource calculator
plan.getMinimumAllocation()); // Minimum allocation
@@ -111,14 +111,14 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
Resource zeroResource = Resource.newInstance(0, 0);
// Pad at the beginning
- long earliestStart = findEarliestTime(mapAllocations.keySet());
+ long earliestStart = findEarliestTime(mapAllocations);
if (jobArrival < earliestStart) {
mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
zeroResource);
}
// Pad at the beginning
- long latestEnd = findLatestTime(mapAllocations.keySet());
+ long latestEnd = findLatestTime(mapAllocations);
if (latestEnd < jobDeadline) {
mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
zeroResource);
@@ -129,8 +129,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
}
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
- ReservationId reservationId, ReservationDefinition reservation)
- throws PlanningException, ContractValidationException;
+ ReservationId reservationId, ReservationDefinition reservation,
+ String user) throws PlanningException, ContractValidationException;
@Override
public boolean createReservation(ReservationId reservationId, String user,
@@ -162,24 +162,26 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
}
- protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
+ protected static long findEarliestTime(
+ Map<ReservationInterval, Resource> sesInt) {
long ret = Long.MAX_VALUE;
- for (ReservationInterval s : sesInt) {
- if (s.getStartTime() < ret) {
- ret = s.getStartTime();
+ for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
+ if (s.getKey().getStartTime() < ret && s.getValue() != null) {
+ ret = s.getKey().getStartTime();
}
}
return ret;
}
- protected static long findLatestTime(Set<ReservationInterval> sesInt) {
+ protected static long findLatestTime(Map<ReservationInterval,
+ Resource> sesInt) {
long ret = Long.MIN_VALUE;
- for (ReservationInterval s : sesInt) {
- if (s.getEndTime() > ret) {
- ret = s.getEndTime();
+ for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
+ if (s.getKey().getEndTime() > ret && s.getValue() != null) {
+ ret = s.getKey().getEndTime();
}
}
return ret;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index 9df6b74..b95f8d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* Interface for allocating a single stage in IterativePlanner.
@@ -46,10 +48,12 @@ public interface StageAllocator {
*
* @return The computed allocation (or null if the stage could not be
* allocated)
+ * @throws PlanningException
*/
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
- long stageEarliestStart, long stageDeadline);
+ long stageEarliestStart, long stageDeadline, String user,
+ ReservationId oldId) throws PlanningException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index 773fbdf..c836970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -40,7 +43,8 @@ public class StageAllocatorGreedy implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
- long stageEarliestStart, long stageDeadline) {
+ long stageEarliestStart, long stageDeadline, String user,
+ ReservationId oldId) throws PlanningException {
Resource totalCapacity = plan.getTotalCapacity();
@@ -63,6 +67,15 @@ public class StageAllocatorGreedy implements StageAllocator {
int maxGang = 0;
+ RLESparseResourceAllocation netAvailable =
+ plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
+ stageDeadline);
+
+ netAvailable =
+ RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), netAvailable, planModifications,
+ RLEOperator.subtract, stageEarliestStart, stageDeadline);
+
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
@@ -79,13 +92,7 @@ public class StageAllocatorGreedy implements StageAllocator {
for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
- // compute net available resources
- Resource netAvailableRes = Resources.clone(totalCapacity);
- // Resources.addTo(netAvailableRes, oldResCap);
- Resources.subtractFrom(netAvailableRes,
- plan.getTotalCommittedResources(t));
- Resources.subtractFrom(netAvailableRes,
- planModifications.getCapacityAtTime(t));
+ Resource netAvailableRes = netAvailable.getCapacityAtTime(t);
// compute maximum number of gangs we could fit
curMaxGang =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index 04cce7b..b9fd8e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.TreeSet;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
@@ -60,7 +61,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
- long stageEarliestStart, long stageDeadline) {
+ long stageEarliestStart, long stageDeadline, String user,
+ ReservationId oldId) {
// Initialize
ResourceCalculator resCalc = plan.getResourceCalculator();
@@ -136,7 +138,9 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
DurationInterval bestDurationInterval =
durationIntervalsSortedByCost.first();
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
-
+ numGangsToAllocate =
+ Math.min(numGangsToAllocate,
+ bestDurationInterval.numCanFit(gang, capacity, resCalc));
// Add it
remainingGangs -= numGangsToAllocate;
@@ -355,5 +359,11 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
this.cost = value;
}
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" start: " + startTime).append(" end: " + endTime)
+ .append(" cost: " + cost).append(" maxLoad: " + maxLoad);
+ return sb.toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 2e262a0..1756e86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -118,11 +118,18 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
+ checkAllocation(plan, alloc, start);
+ }
+
+ private void checkAllocation(Plan plan, int[] alloc, int start) {
+ RLESparseResourceAllocation userCons =
+ plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
+
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
- plan.getConsumptionForUser(user, start + i));
+ userCons.getCapacityAtTime(start + i));
}
}
@@ -180,12 +187,7 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
- for (int i = 0; i < alloc.length; i++) {
- Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
- plan.getTotalCommittedResources(start + i));
- Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
- plan.getConsumptionForUser(user, start + i));
- }
+ checkAllocation(plan, alloc, start);
// Try to add it again
try {
@@ -226,11 +228,14 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
+
+ RLESparseResourceAllocation userCons =
+ plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
- plan.getConsumptionForUser(user, start + i));
+ userCons.getCapacityAtTime(start + i));
}
// Now update it
@@ -252,13 +257,18 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
+
+ userCons =
+ plan.getConsumptionForUserOverTime(user, start, start
+ + updatedAlloc.length);
+
for (int i = 0; i < updatedAlloc.length; i++) {
Assert.assertEquals(
- Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
- + i), plan.getConsumptionForUser(user, start + i));
+ + i), userCons.getCapacityAtTime(start + i));
}
}
@@ -321,13 +331,17 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
+
+ RLESparseResourceAllocation userCons =
+ plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
+
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
- plan.getConsumptionForUser(user, start + i));
+ userCons.getCapacityAtTime(start + i));
}
// Now delete it
@@ -337,11 +351,13 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
+ userCons =
+ plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
- plan.getConsumptionForUser(user, start + i));
+ userCons.getCapacityAtTime(start + i));
}
}
@@ -393,14 +409,8 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
- for (int i = 0; i < alloc1.length; i++) {
- Assert.assertEquals(
- Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
- plan.getTotalCommittedResources(start + i));
- Assert.assertEquals(
- Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
- plan.getConsumptionForUser(user, start + i));
- }
+ checkAllocation(plan, alloc1, start);
+
// Now add another one
ReservationId reservationID2 =
@@ -424,13 +434,17 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan.getReservationById(reservationID2));
+
+ RLESparseResourceAllocation userCons =
+ plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
+
for (int i = 0; i < alloc2.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
- + alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
+ + alloc2[i] + i), userCons.getCapacityAtTime(start + i));
}
// Now archive completed reservations
@@ -445,14 +459,8 @@ public class TestInMemoryPlan {
}
Assert.assertNotNull(plan.getReservationById(reservationID1));
Assert.assertNull(plan.getReservationById(reservationID2));
- for (int i = 0; i < alloc1.length; i++) {
- Assert.assertEquals(
- Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
- plan.getTotalCommittedResources(start + i));
- Assert.assertEquals(
- Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
- plan.getConsumptionForUser(user, start + i));
- }
+ checkAllocation(plan, alloc1, start);
+
when(clock.getTime()).thenReturn(107L);
try {
// will remove 1st reservation also as it has fallen out of the archival
@@ -461,12 +469,16 @@ public class TestInMemoryPlan {
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
+
+ userCons =
+ plan.getConsumptionForUserOverTime(user, start, start + alloc1.length);
+
Assert.assertNull(plan.getReservationById(reservationID1));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
- plan.getConsumptionForUser(user, start + i));
+ userCons.getCapacityAtTime(start + i));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/742632e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
index cb4eaeb..f81e7ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -86,6 +89,7 @@ public class TestGreedyReservationAgent {
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
+
agent = new GreedyReservationAgent();
QueueMetrics queueMetrics = mock(QueueMetrics.class);
@@ -135,6 +139,94 @@ public class TestGreedyReservationAgent {
}
+ @SuppressWarnings("javadoc")
+ @Test
+ public void testSharingPolicyFeedback() throws PlanningException {
+
+ prepareBasicPlan();
+
+ // let's constraint the instantaneous allocation and see the
+ // policy kicking in during planning
+ float instConstraint = 40;
+ float avgConstraint = 40;
+
+ ReservationSchedulerConfiguration conf =
+ ReservationSystemTestUtil.createConf(plan.getQueueName(), 100000,
+ instConstraint, avgConstraint);
+
+ plan.getSharingPolicy().init(plan.getQueueName(), conf);
+
+ // create a request with a single atomic ask
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(5 * step);
+ rr.setDeadline(100 * step);
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
+ 10 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID, "u3", plan, rr);
+
+ ReservationId reservationID2 =
+ ReservationSystemTestUtil.getNewReservationId();
+ agent.createReservation(reservationID2, "u3", plan, rr);
+
+ ReservationDefinition rr3 = new ReservationDefinitionPBImpl();
+ rr3.setArrival(5 * step);
+ rr3.setDeadline(100 * step);
+ ReservationRequest r3 =
+ ReservationRequest.newInstance(Resource.newInstance(2048, 2), 45, 45,
+ 10 * step);
+ ReservationRequests reqs3 = new ReservationRequestsPBImpl();
+ reqs3.setReservationResources(Collections.singletonList(r3));
+ rr3.setReservationRequests(reqs3);
+
+ ReservationId reservationID3 =
+ ReservationSystemTestUtil.getNewReservationId();
+ try {
+ // RR3 is simply too big to fit
+ agent.createReservation(reservationID3, "u3", plan, rr3);
+ fail();
+ } catch (PlanningException pe) {
+ // expected
+ }
+
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 4);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+ ReservationAllocation cs2 = plan.getReservationById(reservationID2);
+ ReservationAllocation cs3 = plan.getReservationById(reservationID3);
+
+ assertNotNull(cs);
+ assertNotNull(cs2);
+ assertNull(cs3);
+
+ System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ for (long i = 90 * step; i < 100 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 20, 2 * 20)));
+ }
+ // RR2 is pushed out by the presence of RR
+ for (long i = 80 * step; i < 90 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs2.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 20, 2 * 20)));
+ }
+ }
+
@Test
public void testOrder() throws PlanningException {
prepareBasicPlan();
@@ -186,7 +278,6 @@ public class TestGreedyReservationAgent {
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
-
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
@@ -376,7 +467,6 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
-
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());