You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:18 UTC
[03/33] git commit: YARN-1710. Logic to find allocations within a
Plan that satisfy user ReservationRequest(s). Contributed by Carlo Curino and
Subru Krishnan. (cherry picked from commit
aef7928899b37262773f3dc117157bb746bf8918) (cherry picked from commi
YARN-1710. Logic to find allocations within a Plan that satisfy user ReservationRequest(s). Contributed by Carlo Curino and Subru Krishnan.
(cherry picked from commit aef7928899b37262773f3dc117157bb746bf8918)
(cherry picked from commit f66ffcf832235e0da0bb050fff08e248b547c360)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a3c1671
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a3c1671
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a3c1671
Branch: refs/heads/branch-2
Commit: 6a3c167175ba98e002b6a67ce73355974c1263da
Parents: 63250ef
Author: carlo curino <Carlo Curino>
Authored: Mon Sep 15 16:56:28 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:21:05 2014 -0700
----------------------------------------------------------------------
YARN-1051-CHANGES.txt | 12 +-
.../reservation/GreedyReservationAgent.java | 367 ++++++++++++
.../reservation/ReservationAgent.java | 55 ++
.../exceptions/ContractValidationException.java | 12 +
.../reservation/TestGreedyReservationAgent.java | 588 +++++++++++++++++++
5 files changed, 1031 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3c1671/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index 410d974..deece7c 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -4,7 +4,13 @@ CapacityScheduler. (Carlo Curino and Subru Krishnan via curino)
YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
-YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
+YARN-1708. Public YARN APIs for creating/updating/deleting
+reservations. (Carlo Curino and Subru Krishnan via subru)
-YARN-1709. In-memory data structures used to track resources over time to
-enable reservations. (subru)
+YARN-1709. In-memory data structures used to track resources over
+time to enable reservations. (Carlo Curino and Subru Krishnan via
+subru)
+
+YARN-1710. Logic to find allocations within a Plan that satisfy
+user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
+curino)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3c1671/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
new file mode 100644
index 0000000..3214f93
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
@@ -0,0 +1,367 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Agent employs a simple greedy placement strategy, placing the various
+ * stages of a {@link ReservationRequest} from the deadline moving backward
+ * towards the arrival. This allows jobs with earlier deadline to be scheduled
+ * greedily as well. Combined with an opportunistic anticipation of work if the
+ * cluster is not fully utilized also seems to provide good latency for
+ * best-effort jobs (i.e., jobs running without a reservation).
+ *
+ * This agent does not account for locality and only consider container
+ * granularity for validation purposes (i.e., you can't exceed max-container
+ * size).
+ */
+public class GreedyReservationAgent implements ReservationAgent {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(GreedyReservationAgent.class);
+
+ @Override
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+ return computeAllocation(reservationId, user, plan, contract, null);
+ }
+
+ @Override
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException {
+ return computeAllocation(reservationId, user, plan, contract,
+ plan.getReservationById(reservationId));
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException {
+ return plan.deleteReservation(reservationId);
+ }
+
+ private boolean computeAllocation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract,
+ ReservationAllocation oldReservation) throws PlanningException,
+ ContractValidationException {
+ LOG.info("placing the following ReservationRequest: " + contract);
+
+ Resource totalCapacity = plan.getTotalCapacity();
+
+ // Here we can addd logic to adjust the ResourceDefinition to account for
+ // system "imperfections" (e.g., scheduling delays for large containers).
+
+ // Align with plan step conservatively (i.e., ceil arrival, and floor
+ // deadline)
+ long earliestStart = contract.getArrival();
+ long step = plan.getStep();
+ if (earliestStart % step != 0) {
+ earliestStart = earliestStart + (step - (earliestStart % step));
+ }
+ long deadline =
+ contract.getDeadline() - contract.getDeadline() % plan.getStep();
+
+ // setup temporary variables to handle time-relations between stages and
+ // intermediate answers
+ long curDeadline = deadline;
+ long oldDeadline = -1;
+
+ Map<ReservationInterval, ReservationRequest> allocations =
+ new HashMap<ReservationInterval, ReservationRequest>();
+ RLESparseResourceAllocation tempAssigned =
+ new RLESparseResourceAllocation(plan.getResourceCalculator(),
+ plan.getMinimumAllocation());
+
+ List<ReservationRequest> stages = contract.getReservationRequests()
+ .getReservationResources();
+ ReservationRequestInterpreter type = contract.getReservationRequests()
+ .getInterpreter();
+
+ // Iterate the stages in backward from deadline
+ for (ListIterator<ReservationRequest> li =
+ stages.listIterator(stages.size()); li.hasPrevious();) {
+
+ ReservationRequest currentReservationStage = li.previous();
+
+ // validate the RR respect basic constraints
+ validateInput(plan, currentReservationStage, totalCapacity);
+
+ // run allocation for a single stage
+ Map<ReservationInterval, ReservationRequest> curAlloc =
+ placeSingleStage(plan, tempAssigned, currentReservationStage,
+ earliestStart, curDeadline, oldReservation, totalCapacity);
+
+ if (curAlloc == null) {
+ // if we did not find an allocation for the currentReservationStage
+ // return null, unless the ReservationDefinition we are placing is of
+ // type ANY
+ if (type != ReservationRequestInterpreter.R_ANY) {
+ throw new PlanningException("The GreedyAgent"
+ + " couldn't find a valid allocation for your request");
+ } else {
+ continue;
+ }
+ } else {
+
+ // if we did find an allocation add it to the set of allocations
+ allocations.putAll(curAlloc);
+
+ // if this request is of type ANY we are done searching (greedy)
+ // and can return the current allocation (break-out of the search)
+ if (type == ReservationRequestInterpreter.R_ANY) {
+ break;
+ }
+
+ // if the request is of ORDER or ORDER_NO_GAP we constraint the next
+ // round of allocation to precede the current allocation, by setting
+ // curDeadline
+ if (type == ReservationRequestInterpreter.R_ORDER
+ || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+ curDeadline = findEarliestTime(curAlloc.keySet());
+
+ // for ORDER_NO_GAP verify that the allocation found so far has no
+ // gap, return null otherwise (the greedy procedure failed to find a
+ // no-gap
+ // allocation)
+ if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
+ && oldDeadline > 0) {
+ if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
+ .getStep()) {
+ throw new PlanningException("The GreedyAgent"
+ + " couldn't find a valid allocation for your request");
+ }
+ }
+ // keep the variable oldDeadline pointing to the last deadline we
+ // found
+ oldDeadline = curDeadline;
+ }
+ }
+ }
+
+ // / If we got here is because we failed to find an allocation for the
+ // ReservationDefinition give-up and report failure to the user
+ if (allocations.isEmpty()) {
+ throw new PlanningException("The GreedyAgent"
+ + " couldn't find a valid allocation for your request");
+ }
+
+ // create reservation with above allocations if not null/empty
+
+ ReservationRequest ZERO_RES =
+ ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
+
+ long firstStartTime = findEarliestTime(allocations.keySet());
+
+ // add zero-padding from arrival up to the first non-null allocation
+ // to guarantee that the reservation exists starting at arrival
+ if (firstStartTime > earliestStart) {
+ allocations.put(new ReservationInterval(earliestStart,
+ firstStartTime), ZERO_RES);
+ firstStartTime = earliestStart;
+ // consider to add trailing zeros at the end for simmetry
+ }
+
+ // Actually add/update the reservation in the plan.
+ // This is subject to validation as other agents might be placing
+ // in parallel and there might be sharing policies the agent is not
+ // aware off.
+ ReservationAllocation capReservation =
+ new InMemoryReservationAllocation(reservationId, contract, user,
+ plan.getQueueName(), firstStartTime,
+ findLatestTime(allocations.keySet()), allocations,
+ plan.getResourceCalculator(), plan.getMinimumAllocation());
+ if (oldReservation != null) {
+ return plan.updateReservation(capReservation);
+ } else {
+ return plan.addReservation(capReservation);
+ }
+ }
+
+ private void validateInput(Plan plan, ReservationRequest rr,
+ Resource totalCapacity) throws ContractValidationException {
+
+ if (rr.getConcurrency() < 1) {
+ throw new ContractValidationException("Gang Size should be >= 1");
+ }
+
+ if (rr.getNumContainers() <= 0) {
+ throw new ContractValidationException("Num containers should be >= 0");
+ }
+
+ // check that gangSize and numContainers are compatible
+ if (rr.getNumContainers() % rr.getConcurrency() != 0) {
+ throw new ContractValidationException(
+ "Parallelism must be an exact multiple of gang size");
+ }
+
+ // check that the largest container request does not exceed
+ // the cluster-wide limit for container sizes
+ if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
+ rr.getCapability(), plan.getMaximumAllocation())) {
+ throw new ContractValidationException("Individual"
+ + " capability requests should not exceed cluster's maxAlloc");
+ }
+ }
+
+ /**
+ * This method actually perform the placement of an atomic stage of the
+ * reservation. The key idea is to traverse the plan backward for a
+ * "lease-duration" worth of time, and compute what is the maximum multiple of
+ * our concurrency (gang) parameter we can fit. We do this and move towards
+ * previous instant in time until the time-window is exhausted or we placed
+ * all the user request.
+ */
+ private Map<ReservationInterval, ReservationRequest> placeSingleStage(
+ Plan plan, RLESparseResourceAllocation tempAssigned,
+ ReservationRequest rr, long earliestStart, long curDeadline,
+ ReservationAllocation oldResAllocation, final Resource totalCapacity) {
+
+ Map<ReservationInterval, ReservationRequest> allocationRequests =
+ new HashMap<ReservationInterval, ReservationRequest>();
+
+ // compute the gang as a resource and get the duration
+ Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
+ long dur = rr.getDuration();
+ long step = plan.getStep();
+
+ // ceil the duration to the next multiple of the plan step
+ if (dur % step != 0) {
+ dur += (step - (dur % step));
+ }
+
+ // we know for sure that this division has no remainder (part of contract
+ // with user, validate before
+ int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+ int maxGang = 0;
+
+ // loop trying to place until we are done, or we are considering
+ // an invalid range of times
+ while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
+
+ // as we run along we remember how many gangs we can fit, and what
+ // was the most constraining moment in time (we will restart just
+ // after that to place the next batch)
+ maxGang = gangsToPlace;
+ long minPoint = curDeadline;
+ int curMaxGang = maxGang;
+
+ // start placing at deadline (excluded due to [,) interval semantics and
+ // move backward
+ for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
+ && maxGang > 0; t = t - plan.getStep()) {
+
+ // As we run along we will logically remove the previous allocation for
+ // this reservation
+ // if one existed
+ Resource oldResCap = Resource.newInstance(0, 0);
+ if (oldResAllocation != null) {
+ oldResCap = oldResAllocation.getResourcesAtTime(t);
+ }
+
+ // compute net available resources
+ Resource netAvailableRes = Resources.clone(totalCapacity);
+ Resources.addTo(netAvailableRes, oldResCap);
+ Resources.subtractFrom(netAvailableRes,
+ plan.getTotalCommittedResources(t));
+ Resources.subtractFrom(netAvailableRes,
+ tempAssigned.getCapacityAtTime(t));
+
+ // compute maximum number of gangs we could fit
+ curMaxGang =
+ (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+ totalCapacity, netAvailableRes, gang));
+
+ // pick the minimum between available resources in this instant, and how
+ // many gangs we have to place
+ curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+ // compare with previous max, and set it. also remember *where* we found
+ // the minimum (useful for next attempts)
+ if (curMaxGang <= maxGang) {
+ maxGang = curMaxGang;
+ minPoint = t;
+ }
+ }
+
+ // if we were able to place any gang, record this, and decrement
+ // gangsToPlace
+ if (maxGang > 0) {
+ gangsToPlace -= maxGang;
+
+ ReservationInterval reservationInt =
+ new ReservationInterval(curDeadline - dur, curDeadline);
+ ReservationRequest reservationRes =
+ ReservationRequest.newInstance(rr.getCapability(),
+ rr.getConcurrency() * maxGang, rr.getConcurrency(),
+ rr.getDuration());
+ // remember occupied space (plan is read-only till we find a plausible
+ // allocation for the entire request). This is needed since we might be
+ // placing other ReservationRequest within the same
+ // ReservationDefinition,
+ // and we must avoid double-counting the available resources
+ tempAssigned.addInterval(reservationInt, reservationRes);
+ allocationRequests.put(reservationInt, reservationRes);
+
+ }
+
+ // reset our new starting point (curDeadline) to the most constraining
+ // point so far, we will look "left" of that to find more places where
+ // to schedule gangs (for sure nothing on the "right" of this point can
+ // fit a full gang.
+ curDeadline = minPoint;
+ }
+
+ // if no gangs are left to place we succeed and return the allocation
+ if (gangsToPlace == 0) {
+ return allocationRequests;
+ } else {
+ // If we are here is becasue we did not manage to satisfy this request.
+ // So we need to remove unwanted side-effect from tempAssigned (needed
+ // for ANY).
+ for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
+ allocationRequests.entrySet()) {
+ tempAssigned.removeInterval(tempAllocation.getKey(),
+ tempAllocation.getValue());
+ }
+ // and return null to signal failure in this allocation
+ return null;
+ }
+ }
+
+ // finds the leftmost point of this set of ReservationInterval
+ private long findEarliestTime(Set<ReservationInterval> resInt) {
+ long ret = Long.MAX_VALUE;
+ for (ReservationInterval s : resInt) {
+ if (s.getStartTime() < ret) {
+ ret = s.getStartTime();
+ }
+ }
+ return ret;
+ }
+
+ // finds the rightmost point of this set of ReservationIntervals
+ private long findLatestTime(Set<ReservationInterval> resInt) {
+ long ret = Long.MIN_VALUE;
+ for (ReservationInterval s : resInt) {
+ if (s.getEndTime() > ret) {
+ ret = s.getEndTime();
+ }
+ }
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3c1671/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
new file mode 100644
index 0000000..fe1941d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
@@ -0,0 +1,55 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An entity that seeks to acquire resources to satisfy an user's contract
+ */
+public interface ReservationAgent {
+
+ /**
+ * Create a reservation for the user that abides by the specified contract
+ *
+ * @param reservationId the identifier of the reservation to be created.
+ * @param user the user who wants to create the reservation
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources the user requires for his
+ * session
+ *
+ * @return whether the create operation was successful or not
+ * @throws PlanningException if the session cannot be fitted into the plan
+ */
+ public boolean createReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException;
+
+ /**
+ * Update a reservation for the user that abides by the specified contract
+ *
+ * @param reservationId the identifier of the reservation to be updated
+ * @param user the user who wants to create the session
+ * @param plan the Plan to which the reservation must be fitted
+ * @param contract encapsulates the resources the user requires for his
+ * reservation
+ *
+ * @return whether the update operation was successful or not
+ * @throws PlanningException if the reservation cannot be fitted into the plan
+ */
+ public boolean updateReservation(ReservationId reservationId, String user,
+ Plan plan, ReservationDefinition contract) throws PlanningException;
+
+ /**
+ * Delete an user reservation
+ *
+ * @param reservationId the identifier of the reservation to be deleted
+ * @param user the user who wants to create the reservation
+ * @param plan the Plan to which the session must be fitted
+ *
+ * @return whether the delete operation was successful or not
+ * @throws PlanningException if the reservation cannot be fitted into the plan
+ */
+ public boolean deleteReservation(ReservationId reservationId, String user,
+ Plan plan) throws PlanningException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3c1671/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
new file mode 100644
index 0000000..7ee5a76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java
@@ -0,0 +1,12 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+public class ContractValidationException extends PlanningException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ContractValidationException(String message) {
+ super(message);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3c1671/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
new file mode 100644
index 0000000..0b0201d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
@@ -0,0 +1,588 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestGreedyReservationAgent {
+
+ ReservationAgent agent;
+ InMemoryPlan plan;
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ ResourceCalculator res = new DefaultResourceCalculator();
+ Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+ Random rand = new Random();
+ long step;
+
+ @Before
+ public void setup() throws Exception {
+
+ long seed = rand.nextLong();
+ rand.setSeed(seed);
+ Log.info("Running with seed: " + seed);
+
+ // setting completely loose quotas
+ long timeWindow = 1000000L;
+ Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
+ step = 1000L;
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125);
+ String reservationQ = testUtil.getFullReservationQueueName();
+ CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+ capConf.setReservationWindow(reservationQ, timeWindow);
+ capConf.setMaximumCapacity(reservationQ, 100);
+ capConf.setAverageCapacity(reservationQ, 100);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, capConf, new HashSet<String>());
+ agent = new GreedyReservationAgent();
+
+ QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated",
+ mock(ParentQueue.class), false, capConf);
+
+ plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+ res, minAlloc, maxAlloc, "dedicated", null, true);
+ }
+
+ @SuppressWarnings("javadoc")
+ @Test
+ public void testSimple() throws PlanningException {
+
+ prepareBasicPlan();
+
+ // create a request with a single atomic ask
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(5 * step);
+ rr.setDeadline(20 * step);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 5, 10 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ for (long i = 10 * step; i < 20 * step; i++) {
+ assertTrue(
+ "Agent-based allocation unexpected",
+ Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(2048 * 10, 2 * 10)));
+ }
+
+ }
+
+ @Test
+ public void testOrder() throws PlanningException {
+ prepareBasicPlan();
+
+ // create a completely utilized segment around time 30
+ int[] f = { 100, 100 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 30 * step, 30 * step + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+ res, minAlloc)));
+
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0 * step);
+ rr.setDeadline(70 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20 * step);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 4);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+
+ System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testOrderNoGapImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create a completely utilized segment at time 30
+ int[] f = { 100, 100 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 30 * step, 30 * step + f.length * step,
+ ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+ res, minAlloc)));
+
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0L);
+
+ rr.setDeadline(70L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+
+ // validate
+ assertFalse("Agent-based allocation should have failed", result);
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 3);
+
+ System.out
+ .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testOrderNoGap() throws PlanningException {
+ prepareBasicPlan();
+ // create a chain of 4 RR, mixing gang and non-gang
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(0 * step);
+ rr.setDeadline(60 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 1, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 10, 10, 20 * step);
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ // validate
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
+
+ }
+
+ @Test
+ public void testSingleSliding() throws PlanningException {
+ prepareBasicPlan();
+
+ // create a single request for which we need subsequent (tight) packing.
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 200, 10, 10 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
+
+ System.out.println("--------AFTER packed ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAny() throws PlanningException {
+ prepareBasicPlan();
+ // create an ANY request, with an impossible step (last in list, first
+ // considered),
+ // and two satisfiable ones. We expect the second one to be returned.
+
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 5, 5, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 5, 10 * step);
+ ReservationRequest r3 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 110, 110, 10 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ list.add(r3);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean res = agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", res);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+
+ System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAnyImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create an ANY request, with all impossible alternatives
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100L);
+ rr.setDeadline(120L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+
+ // longer than arrival-deadline
+ ReservationRequest r1 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 35, 5, 30);
+ // above max cluster size
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 110, 110, 10);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r1);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+ // validate results, we expect the second one to be accepted
+ assertFalse("Agent-based allocation should have failed", result);
+ assertTrue("Agent-based allocation should have failed", plan
+ .getAllReservations().size() == 2);
+
+ System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAll() throws PlanningException {
+ prepareBasicPlan();
+ // create an ALL request
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100 * step);
+ rr.setDeadline(120 * step);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 5, 5, 10 * step);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 10, 10, 20 * step);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ // submit to agent
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ agent.createReservation(reservationID, "u1", plan, rr);
+
+ // validate results, we expect the second one to be accepted
+ assertTrue("Agent-based allocation failed", reservationID != null);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 3);
+
+ ReservationAllocation cs = plan.getReservationById(reservationID);
+
+ assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+ assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+
+ System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ @Test
+ public void testAllImpossible() throws PlanningException {
+ prepareBasicPlan();
+ // create an ALL request, with an impossible combination, it should be
+ // rejected, and allocation remain unchanged
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(100L);
+ rr.setDeadline(120L);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 55, 5, 10);
+ ReservationRequest r2 = ReservationRequest.newInstance(
+ Resource.newInstance(2048, 2), 55, 5, 20);
+
+ List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+ list.add(r);
+ list.add(r2);
+ reqs.setReservationResources(list);
+ rr.setReservationRequests(reqs);
+
+ ReservationId reservationID = ReservationSystemTestUtil
+ .getNewReservationId();
+ boolean result = false;
+ try {
+ // submit to agent
+ result = agent.createReservation(reservationID, "u1", plan, rr);
+ fail();
+ } catch (PlanningException p) {
+ // expected
+ }
+
+ // validate results, we expect the second one to be accepted
+ assertFalse("Agent-based allocation failed", result);
+ assertTrue("Agent-based allocation failed", plan.getAllReservations()
+ .size() == 2);
+
+ System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+ + reservationID + ")----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+
+ }
+
+ private void prepareBasicPlan() throws PlanningException {
+
+ // insert in the reservation a couple of controlled reservations, to create
+ // conditions for assignment that are non-empty
+
+ int[] f = { 10, 10, 20, 20, 20, 10, 10 };
+
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
+ .generateAllocation(0, step, f), res, minAlloc)));
+
+ int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
+ Map<ReservationInterval, ReservationRequest> alloc =
+ ReservationSystemTestUtil.generateAllocation(5000, step, f2);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(
+ ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+ "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
+
+ System.out.println("--------BEFORE AGENT----------");
+ System.out.println(plan.toString());
+ System.out.println(plan.toCumulativeString());
+ }
+
+ private boolean check(ReservationAllocation cs, long start, long end,
+ int containers, int mem, int cores) {
+
+ boolean res = true;
+ for (long i = start; i < end; i++) {
+ res = res
+ && Resources.equals(cs.getResourcesAtTime(i),
+ Resource.newInstance(mem * containers, cores * containers));
+ }
+ return res;
+ }
+
+ public void testStress(int numJobs) throws PlanningException, IOException {
+
+ long timeWindow = 1000000L;
+ Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
+ step = 1000L;
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
+ String reservationQ = testUtil.getFullReservationQueueName();
+ CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
+ capConf.setReservationWindow(reservationQ, timeWindow);
+ capConf.setMaximumCapacity(reservationQ, 100);
+ capConf.setAverageCapacity(reservationQ, 100);
+ CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+ policy.init(reservationQ, capConf, new HashSet<String>());
+
+ plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
+ clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
+
+ int acc = 0;
+ List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+ for (long i = 0; i < numJobs; i++) {
+ list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < numJobs; i++) {
+
+ try {
+ if (agent.createReservation(
+ ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
+ plan, list.get(i))) {
+ acc++;
+ }
+ } catch (PlanningException p) {
+ // ignore exceptions
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+ + " in " + (end - start) + "ms");
+ }
+
+ public static void main(String[] arg) {
+
+ // run a stress test with by default 1000 random jobs
+ int numJobs = 1000;
+ if (arg.length > 0) {
+ numJobs = Integer.parseInt(arg[0]);
+ }
+
+ try {
+ TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+ test.setup();
+ test.testStress(numJobs);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}