You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:17 UTC
[02/33] git commit: YARN-1711. Policy to enforce instantaneous and
over-time quotas on user reservation. Contributed by Carlo Curino and Subru
Krishnan. (cherry picked from commit
c4918cb4cb5a267a8cfd6eace28fcfe7ad6174e8) (cherry picked from commit b6df0
YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan.
(cherry picked from commit c4918cb4cb5a267a8cfd6eace28fcfe7ad6174e8)
(cherry picked from commit b6df0dddcdafd7ec67c76ea92aea3ff3e94db247)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6bfdaf06
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6bfdaf06
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6bfdaf06
Branch: refs/heads/branch-2
Commit: 6bfdaf06c496c9db87c054977617c16e372325ac
Parents: 6a3c167
Author: carlo curino <Carlo Curino>
Authored: Tue Sep 16 13:20:57 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:21:05 2014 -0700
----------------------------------------------------------------------
YARN-1051-CHANGES.txt | 3 +
.../reservation/CapacityOverTimePolicy.java | 231 +++++++++++++++++++
.../reservation/NoOverCommitPolicy.java | 74 ++++++
.../reservation/SharingPolicy.java | 49 ++++
.../exceptions/ContractValidationException.java | 9 +-
.../exceptions/MismatchedUserException.java | 28 +++
.../exceptions/PlanningException.java | 9 +-
.../exceptions/PlanningQuotaException.java | 28 +++
.../exceptions/ResourceOverCommitException.java | 28 +++
.../CapacitySchedulerConfiguration.java | 154 +++++++++++++
.../reservation/TestCapacityOverTimePolicy.java | 222 ++++++++++++++++++
.../reservation/TestNoOverCommitPolicy.java | 144 ++++++++++++
12 files changed, 977 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index deece7c..e9ec691 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -14,3 +14,6 @@ subru)
YARN-1710. Logic to find allocations within a Plan that satisfy
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
curino)
+
+YARN-1711. Policy to enforce instantaneous and over-time quotas
+on user reservations. (Carlo Curino and Subru Krishnan via curino)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
new file mode 100644
index 0000000..38c0207
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java
@@ -0,0 +1,231 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Date;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This policy enforces a time-extended notion of Capacity. In particular it
+ * guarantees that the allocation received in input when combined with all
+ * previous allocation for the user does not violate an instantaneous max limit
+ * on the resources received, and that for every window of time of length
+ * validWindow, the integral of the allocations for a user (sum of the currently
+ * submitted allocation and all prior allocations for the user) does not exceed
+ * validWindow * maxAvg.
+ *
+ * This allows flexibility, in the sense that an allocation can instantaneously
+ * use large portions of the available capacity, but prevents abuses by bounding
+ * the average use over time.
+ *
+ * By controlling maxInst, maxAvg, validWindow the administrator configuring
+ * this policy can obtain a behavior ranging from instantaneously enforced
+ * capacity (akin to existing queues), or fully flexible allocations (likely
+ * reserved to super-users, or trusted systems).
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class CapacityOverTimePolicy implements SharingPolicy {
+
+ private CapacitySchedulerConfiguration conf;
+ private long validWindow;
+ private float maxInst;
+ private float maxAvg;
+
+ // For now this is CapacityScheduler specific, but given a hierarchy in the
+ // configuration structure of the schedulers (e.g., SchedulerConfiguration)
+ // it should be easy to remove this limitation
+ @Override
+ public void init(String reservationQueuePath, Configuration conf) {
+ this.conf = (CapacitySchedulerConfiguration) conf;
+ validWindow = this.conf.getReservationWindow(reservationQueuePath);
+ maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
+ maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
+ };
+
+ @Override
+ public void validate(Plan plan, ReservationAllocation reservation)
+ throws PlanningException {
+
+ // this is entire method invoked under a write-lock on the plan, no need
+ // to synchronize accesses to the plan further
+
+ // Try to verify whether there is already a reservation with this ID in
+ // the system (remove its contribution during validation to simulate a
+ // try-n-swap
+ // update).
+ ReservationAllocation oldReservation =
+ plan.getReservationById(reservation.getReservationId());
+
+ // sanity check that the update of a reservation is not changing username
+ if (oldReservation != null
+ && !oldReservation.getUser().equals(reservation.getUser())) {
+ throw new MismatchedUserException(
+ "Updating an existing reservation with mismatched user:"
+ + oldReservation.getUser() + " != " + reservation.getUser());
+ }
+
+ long startTime = reservation.getStartTime();
+ long endTime = reservation.getEndTime();
+ long step = plan.getStep();
+
+ Resource planTotalCapacity = plan.getTotalCapacity();
+
+ Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
+ Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
+
+ // define variable that will store integral of resources (need diff class to
+ // avoid overflow issues for long/large allocations)
+ IntegralResource runningTot = new IntegralResource(0L, 0L);
+ IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
+ maxAllowed.multiplyBy(validWindow / step);
+
+ // check that the resources offered to the user during any window of length
+ // "validWindow" overlapping this allocation are within maxAllowed
+ // also enforce instantaneous and physical constraints during this pass
+ for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
+
+ Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
+ Resource currExistingAllocForUser =
+ plan.getConsumptionForUser(reservation.getUser(), t);
+ Resource currNewAlloc = reservation.getResourcesAtTime(t);
+ Resource currOldAlloc = Resources.none();
+ if (oldReservation != null) {
+ currOldAlloc = oldReservation.getResourcesAtTime(t);
+ }
+
+ // throw exception if the cluster is overcommitted
+ // tot_allocated - old + new > capacity
+ Resource inst =
+ Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
+ currOldAlloc);
+ if (Resources.greaterThan(plan.getResourceCalculator(),
+ planTotalCapacity, inst, planTotalCapacity)) {
+ throw new ResourceOverCommitException(" Resources at time " + t
+ + " would be overcommitted (" + inst + " over "
+ + plan.getTotalCapacity() + ") by accepting reservation: "
+ + reservation.getReservationId());
+ }
+
+ // throw exception if instantaneous limits are violated
+ // tot_alloc_to_this_user - old + new > inst_limit
+ if (Resources.greaterThan(plan.getResourceCalculator(),
+ planTotalCapacity, Resources.subtract(
+ Resources.add(currExistingAllocForUser, currNewAlloc),
+ currOldAlloc), maxInsRes)) {
+ throw new PlanningQuotaException("Instantaneous quota capacity "
+ + maxInst + " would be passed at time " + t
+ + " by accepting reservation: " + reservation.getReservationId());
+ }
+
+ // throw exception if the running integral of utilization over validWindow
+ // is violated. We perform a delta check, adding/removing instants at the
+ // boundary of the window from runningTot.
+
+ // runningTot = previous_runningTot + currExistingAllocForUser +
+ // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
+
+ // Where:
+ // 1) currNewAlloc, currExistingAllocForUser represent the contribution of
+ // the instant in time added in this pass.
+ // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
+ // instants that are being retired from the the window
+ // 3) currOldAlloc is the contribution (if any) of the previous version of
+ // this reservation (the one we are updating)
+
+ runningTot.add(currExistingAllocForUser);
+ runningTot.add(currNewAlloc);
+ runningTot.subtract(currOldAlloc);
+
+ // expire contributions from instant in time before (t - validWindow)
+ if (t > startTime) {
+ Resource pastOldAlloc =
+ plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
+ Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
+
+ // runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
+ runningTot.subtract(pastOldAlloc);
+ runningTot.subtract(pastNewAlloc);
+ }
+
+ // check integral
+ // runningTot > maxAvg * validWindow
+ // NOTE: we need to use comparator of IntegralResource directly, as
+ // Resource and ResourceCalculator assume "int" amount of resources,
+ // which is not sufficient when comparing integrals (out-of-bound)
+ if (maxAllowed.compareTo(runningTot) < 0) {
+ throw new PlanningQuotaException(
+ "Integral (avg over time) quota capacity " + maxAvg
+ + " over a window of " + validWindow / 1000 + " seconds, "
+ + " would be passed at time " + t + "(" + new Date(t)
+ + ") by accepting reservation: "
+ + reservation.getReservationId());
+ }
+ }
+ }
+
+ @Override
+ public long getValidWindow() {
+ return validWindow;
+ }
+
+ /**
+ * This class provides support for Resource-like book-keeping, based on
+ * long(s), as using Resource to store the "integral" of the allocation over
+ * time leads to integer overflows for large allocations/clusters. (Evolving
+ * Resource to use long is too disruptive at this point.)
+ *
+ * The comparison/multiplication behaviors of IntegralResource are consistent
+ * with the DefaultResourceCalculator.
+ */
+ public class IntegralResource {
+ long memory;
+ long vcores;
+
+ public IntegralResource(Resource resource) {
+ this.memory = resource.getMemory();
+ this.vcores = resource.getVirtualCores();
+ }
+
+ public IntegralResource(long mem, long vcores) {
+ this.memory = mem;
+ this.vcores = vcores;
+ }
+
+ public void add(Resource r) {
+ memory += r.getMemory();
+ vcores += r.getVirtualCores();
+ }
+
+ public void subtract(Resource r) {
+ memory -= r.getMemory();
+ vcores -= r.getVirtualCores();
+ }
+
+ public void multiplyBy(long window) {
+ memory = memory * window;
+ vcores = vcores * window;
+ }
+
+ public long compareTo(IntegralResource other) {
+ long diff = memory - other.memory;
+ if (diff == 0) {
+ diff = vcores - other.vcores;
+ }
+ return diff;
+ }
+
+ @Override
+ public String toString() {
+ return "<memory:" + memory + ", vCores:" + vcores + ">";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
new file mode 100644
index 0000000..cbe2b78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -0,0 +1,74 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This policy enforce a simple physical cluster capacity constraints, by
+ * validating that the allocation proposed fits in the current plan. This
+ * validation is compatible with "updates" and in verifying the capacity
+ * constraints it conceptually remove the prior version of the reservation.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class NoOverCommitPolicy implements SharingPolicy {
+
+ @Override
+ public void validate(Plan plan, ReservationAllocation reservation)
+ throws PlanningException {
+
+ ReservationAllocation oldReservation =
+ plan.getReservationById(reservation.getReservationId());
+
+ // check updates are using same name
+ if (oldReservation != null
+ && !oldReservation.getUser().equals(reservation.getUser())) {
+ throw new MismatchedUserException(
+ "Updating an existing reservation with mismatching user:"
+ + oldReservation.getUser() + " != " + reservation.getUser());
+ }
+
+ long startTime = reservation.getStartTime();
+ long endTime = reservation.getEndTime();
+ long step = plan.getStep();
+
+ // for every instant in time, check we are respecting cluster capacity
+ for (long t = startTime; t < endTime; t += step) {
+ Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
+ Resource currNewAlloc = reservation.getResourcesAtTime(t);
+ Resource currOldAlloc = Resource.newInstance(0, 0);
+ if (oldReservation != null) {
+ oldReservation.getResourcesAtTime(t);
+ }
+ // check the cluster is never over committed
+ // currExistingAllocTot + currNewAlloc - currOldAlloc >
+ // capPlan.getTotalCapacity()
+ if (Resources.greaterThan(plan.getResourceCalculator(), plan
+ .getTotalCapacity(), Resources.subtract(
+ Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
+ plan.getTotalCapacity())) {
+ throw new ResourceOverCommitException("Resources at time " + t
+ + " would be overcommitted by " + "accepting reservation: "
+ + reservation.getReservationId());
+ }
+ }
+ }
+
+ @Override
+ public long getValidWindow() {
+ // this policy has no "memory" so the valid window is set to zero
+ return 0;
+ }
+
+ @Override
+ public void init(String inventoryQueuePath, Configuration conf) {
+ // nothing to do for this policy
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
new file mode 100644
index 0000000..d917764
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -0,0 +1,49 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * This is the interface for policy that validate new
+ * {@link ReservationAllocation}s for allocations being added to a {@link Plan}.
+ * Individual policies will be enforcing different invariants.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public interface SharingPolicy {
+
+ /**
+ * Initialize this policy
+ *
+ * @param inventoryQueuePath the name of the queue for this plan
+ * @param conf the system configuration
+ */
+ public void init(String inventoryQueuePath, Configuration conf);
+
+ /**
+ * This method runs the policy validation logic, and return true/false on
+ * whether the {@link ReservationAllocation} is acceptable according to this
+ * sharing policy.
+ *
+ * @param plan the {@link Plan} we validate against
+ * @param newAllocation the allocation proposed to be added to the
+ * {@link Plan}
+ * @throws PlanningException if the policy is respected if we add this
+ * {@link ReservationAllocation} to the {@link Plan}
+ */
+ public void validate(Plan plan, ReservationAllocation newAllocation)
+ throws PlanningException;
+
+ /**
+ * Returns the time range before and after the current reservation considered
+ * by this policy. In particular, this informs the archival process for the
+ * {@link Plan}, i.e., reservations regarding times before (now - validWindow)
+ * can be deleted.
+ *
+ * @return validWindow the window of validity considered by the policy.
+ */
+ public long getValidWindow();
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
index 7ee5a76..cd82a9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
@@ -1,5 +1,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This exception is thrown if the request made is not syntactically valid.
+ */
+@Public
+@Unstable
public class ContractValidationException extends PlanningException {
private static final long serialVersionUID = 1L;
@@ -8,5 +16,4 @@ public class ContractValidationException extends PlanningException {
super(message);
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java
new file mode 100644
index 0000000..0a443f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Exception thrown when an update to an existing reservation is performed
+ * by a user that is not the reservation owner.
+ */
+@Public
+@Unstable
+public class MismatchedUserException extends PlanningException {
+
+ private static final long serialVersionUID = 8313222590561668413L;
+
+ public MismatchedUserException(String message) {
+ super(message);
+ }
+
+ public MismatchedUserException(Throwable cause) {
+ super(cause);
+ }
+
+ public MismatchedUserException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
index aa9e9fb..0699856 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
@@ -2,10 +2,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
/**
* Exception thrown by the admission control subsystem when there is a problem
- * in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
+ * in trying to find an allocation for a user
+ * {@link ReservationSubmissionRequest}.
*/
+
+@Public
+@Unstable
public class PlanningException extends Exception {
private static final long serialVersionUID = -684069387367879218L;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java
new file mode 100644
index 0000000..aad4ee8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This exception is thrown if the user quota is exceed while accepting or
+ * updating a reservation.
+ */
+@Public
+@Unstable
+public class PlanningQuotaException extends PlanningException {
+
+ private static final long serialVersionUID = 8206629288380246166L;
+
+ public PlanningQuotaException(String message) {
+ super(message);
+ }
+
+ public PlanningQuotaException(Throwable cause) {
+ super(cause);
+ }
+
+ public PlanningQuotaException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java
new file mode 100644
index 0000000..a4c2b07
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This exception indicate that the reservation that has been attempted, would
+ * exceed the physical resources available in the {@link Plan} at the moment.
+ */
+@Public
+@Unstable
+public class ResourceOverCommitException extends PlanningException {
+
+ private static final long serialVersionUID = 7070699407526521032L;
+
+ public ResourceOverCommitException(String message) {
+ super(message);
+ }
+
+ public ResourceOverCommitException(Throwable cause) {
+ super(cause);
+ }
+
+ public ResourceOverCommitException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 5542ef3..ba501b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -190,6 +190,63 @@ public class CapacitySchedulerConfiguration extends Configuration {
}
}
+ @Private
+ public static final String AVERAGE_CAPACITY = "average-capacity";
+
+ @Private
+ public static final String IS_RESERVABLE = "reservable";
+
+ @Private
+ public static final String RESERVATION_WINDOW = "reservation-window";
+
+ @Private
+ public static final String INSTANTANEOUS_MAX_CAPACITY =
+ "instantaneous-max-capacity";
+
+ @Private
+ public static final long DEFAULT_RESERVATION_WINDOW = 0L;
+
+ @Private
+ public static final String RESERVATION_ADMISSION_POLICY =
+ "reservation-policy";
+
+ @Private
+ public static final String RESERVATION_AGENT_NAME = "reservation-agent";
+
+ @Private
+ public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
+ "show-reservations-as-queues";
+
+ @Private
+ public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
+
+ @Private
+ public static final String DEFAULT_RESERVATION_AGENT_NAME =
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
+
+ @Private
+ public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
+
+ @Private
+ public static final String DEFAULT_RESERVATION_PLANNER_NAME =
+ "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
+
+ @Private
+ public static final String RESERVATION_MOVE_ON_EXPIRY =
+ "reservation-move-on-expiry";
+
+ @Private
+ public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
+
+ @Private
+ public static final String RESERVATION_ENFORCEMENT_WINDOW =
+ "reservation-enforcement-window";
+
+ // default to 1h lookahead enforcement
+ @Private
+ public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000;
+
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@@ -511,4 +568,101 @@ public class CapacitySchedulerConfiguration extends Configuration {
return mappings;
}
+
+ public boolean isReservable(String queue) {
+ boolean isReservable =
+ getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
+ return isReservable;
+ }
+
+ public void setReservable(String queue, boolean isReservable) {
+ setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
+ LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue)
+ + ", isReservableQueue=" + isReservable(queue));
+ }
+
+ public long getReservationWindow(String queue) {
+ long reservationWindow =
+ getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
+ DEFAULT_RESERVATION_WINDOW);
+ return reservationWindow;
+ }
+
+ public float getAverageCapacity(String queue) {
+ float avgCapacity =
+ getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
+ MAXIMUM_CAPACITY_VALUE);
+ return avgCapacity;
+ }
+
+ public float getInstantaneousMaxCapacity(String queue) {
+ float instMaxCapacity =
+ getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
+ MAXIMUM_CAPACITY_VALUE);
+ return instMaxCapacity;
+ }
+
+ public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
+ setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
+ instMaxCapacity);
+ }
+
+ public void setReservationWindow(String queue, long reservationWindow) {
+ setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
+ }
+
+ public void setAverageCapacity(String queue, float avgCapacity) {
+ setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
+ }
+
+ public String getReservationAdmissionPolicy(String queue) {
+ String reservationPolicy =
+ get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
+ DEFAULT_RESERVATION_ADMISSION_POLICY);
+ return reservationPolicy;
+ }
+
+ public void setReservationAdmissionPolicy(String queue,
+ String reservationPolicy) {
+ set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
+ }
+
+ public String getReservationAgent(String queue) {
+ String reservationAgent =
+ get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
+ DEFAULT_RESERVATION_AGENT_NAME);
+ return reservationAgent;
+ }
+
+ public void setReservationAgent(String queue, String reservationPolicy) {
+ set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
+ }
+
+ public boolean getShowReservationAsQueues(String queuePath) {
+ boolean showReservationAsQueues =
+ getBoolean(getQueuePrefix(queuePath)
+ + RESERVATION_SHOW_RESERVATION_AS_QUEUE, false);
+ return showReservationAsQueues;
+ }
+
+ public String getReplanner(String queue) {
+ String replanner =
+ get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
+ DEFAULT_RESERVATION_PLANNER_NAME);
+ return replanner;
+ }
+
+ public boolean getMoveOnExpiry(String queue) {
+ boolean killOnExpiry =
+ getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
+ DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
+ return killOnExpiry;
+ }
+
+ public long getEnforcementWindow(String queue) {
+ long enforcementWindow =
+ getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
+ DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
+ return enforcementWindow;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
new file mode 100644
index 0000000..83d6d3f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -0,0 +1,222 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCapacityOverTimePolicy {
+
+ long timeWindow;
+ long step;
+ float avgConstraint;
+ float instConstraint;
+ long initTime;
+
+ InMemoryPlan plan;
+ ReservationAgent mAgent;
+ Resource minAlloc;
+ ResourceCalculator res;
+ Resource maxAlloc;
+
+ int totCont = 1000000;
+
+ @Before
+ public void setup() throws Exception {
+
+ // 24h window
+ timeWindow = 86400000L;
+ // 1 sec step
+ step = 1000L;
+
+ // 25% avg cap on capacity
+ avgConstraint = 25;
+
+ // 70% instantaneous cap on capacity
+ instConstraint = 70;
+
+ initTime = System.currentTimeMillis();
+ minAlloc = Resource.newInstance(1024, 1);
+ res = new DefaultResourceCalculator();
+ maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+ mAgent = mock(ReservationAgent.class);
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
+ String reservationQ = testUtil.getFullReservationQueueName();
+ CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+ capConf.setReservationWindow(reservationQ, timeWindow);
+ capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint);
+ capConf.setAverageCapacity(reservationQ, avgConstraint);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, capConf);
+
+ plan =
+ new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+ scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
+ "dedicated", null, true);
+ }
+
+ public int[] generateData(int length, int val) {
+ int[] data = new int[length];
+ for (int i = 0; i < length; i++) {
+ data[i] = val;
+ }
+ return data;
+ }
+
+ @Test
+ public void testSimplePass() throws IOException, PlanningException {
+ // generate allocation that simply fit within all constraints
+ int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+
+ @Test
+ public void testSimplePass2() throws IOException, PlanningException {
+ // generate allocation from single tenant that exceed avg momentarily but
+ // fit within
+ // max instantanesou
+ int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+
+ @Test
+ public void testMultiTenantPass() throws IOException, PlanningException {
+ // generate allocation from multiple tenants that barely fit in tot capacity
+ int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+ for (int i = 0; i < 4; i++) {
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+ }
+
+ @Test(expected = ResourceOverCommitException.class)
+ public void testMultiTenantFail() throws IOException, PlanningException {
+ // generate allocation from multiple tenants that exceed tot capacity
+ int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+ for (int i = 0; i < 5; i++) {
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+ }
+
+ @Test(expected = PlanningQuotaException.class)
+ public void testInstFail() throws IOException, PlanningException {
+ // generate allocation that exceed the instantaneous cap single-show
+ int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ Assert.fail("should not have accepted this");
+ }
+
+ @Test
+ public void testInstFailBySum() throws IOException, PlanningException {
+ // generate allocation that exceed the instantaneous cap by sum
+ int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ try {
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ Assert.fail();
+ } catch (PlanningQuotaException p) {
+ // expected
+ }
+ }
+
+ @Test(expected = PlanningQuotaException.class)
+ public void testFailAvg() throws IOException, PlanningException {
+ // generate an allocation which violates the 25% average single-shot
+ Map<ReservationInterval, ReservationRequest> req =
+ new TreeMap<ReservationInterval, ReservationRequest>();
+ long win = timeWindow / 2 + 100;
+ int cont = (int) Math.ceil(0.5 * totCont);
+ req.put(new ReservationInterval(initTime, initTime + win),
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + win, req, res, minAlloc)));
+ }
+
+ @Test
+ public void testFailAvgBySum() throws IOException, PlanningException {
+ // generate an allocation which violates the 25% average by sum
+ Map<ReservationInterval, ReservationRequest> req =
+ new TreeMap<ReservationInterval, ReservationRequest>();
+ long win = 86400000 / 4 + 1;
+ int cont = (int) Math.ceil(0.5 * totCont);
+ req.put(new ReservationInterval(initTime, initTime + win),
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + win, req, res, minAlloc)));
+
+ try {
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + win, req, res, minAlloc)));
+
+ Assert.fail("should not have accepted this");
+ } catch (PlanningQuotaException e) {
+ // expected
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6bfdaf06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
new file mode 100644
index 0000000..2ceead3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -0,0 +1,144 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNoOverCommitPolicy {
+
+ long step;
+ long initTime;
+
+ InMemoryPlan plan;
+ ReservationAgent mAgent;
+ Resource minAlloc;
+ ResourceCalculator res;
+ Resource maxAlloc;
+
+ int totCont = 1000000;
+
+ @Before
+ public void setup() throws Exception {
+
+ // 1 sec step
+ step = 1000L;
+
+ initTime = System.currentTimeMillis();
+ minAlloc = Resource.newInstance(1024, 1);
+ res = new DefaultResourceCalculator();
+ maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+ mAgent = mock(ReservationAgent.class);
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
+ String reservationQ = testUtil.getFullReservationQueueName();
+ CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+ NoOverCommitPolicy policy = new NoOverCommitPolicy();
+ policy.init(reservationQ, capConf);
+
+ plan =
+ new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+ scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
+ "dedicated", null, true);
+ }
+
+ public int[] generateData(int length, int val) {
+ int[] data = new int[length];
+ for (int i = 0; i < length; i++) {
+ data[i] = val;
+ }
+ return data;
+ }
+
+ @Test
+ public void testSingleUserEasyFitPass() throws IOException, PlanningException {
+ // generate allocation that easily fit within resource constraints
+ int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+
+ @Test
+ public void testSingleUserBarelyFitPass() throws IOException,
+ PlanningException {
+ // generate allocation from single tenant that barely fit
+ int[] f = generateData(3600, totCont);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+
+ @Test(expected = ResourceOverCommitException.class)
+ public void testSingleFail() throws IOException, PlanningException {
+ // generate allocation from single tenant that exceed capacity
+ int[] f = generateData(3600, (int) (1.1 * totCont));
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
+ .generateAllocation(initTime, step, f), res, minAlloc));
+ }
+
+ @Test(expected = MismatchedUserException.class)
+ public void testUserMismatch() throws IOException, PlanningException {
+ // generate allocation from single tenant that exceed capacity
+ int[] f = generateData(3600, (int) (0.5 * totCont));
+
+ ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
+ plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
+ "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
+ .generateAllocation(initTime, step, f), res, minAlloc));
+
+ // trying to update a reservation with a mismatching user
+ plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
+ "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
+ .generateAllocation(initTime, step, f), res, minAlloc));
+ }
+
+ @Test
+ public void testMultiTenantPass() throws IOException, PlanningException {
+ // generate allocation from multiple tenants that barely fit in tot capacity
+ int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+ for (int i = 0; i < 4; i++) {
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+ }
+
+ @Test(expected = ResourceOverCommitException.class)
+ public void testMultiTenantFail() throws IOException, PlanningException {
+ // generate allocation from multiple tenants that exceed tot capacity
+ int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
+ for (int i = 0; i < 5; i++) {
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
+ "dedicated", initTime, initTime + f.length,
+ ReservationSystemTestUtil.generateAllocation(initTime, step, f),
+ res, minAlloc)));
+ }
+ }
+}