You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/09/01 22:17:00 UTC
[2/3] hadoop git commit: Plan/ResourceAllocation data structure
enhancements required to support recurring reservations in ReservationSystem.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 658387b..100d38c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.io.IOException;
-import java.io.StringWriter;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -33,8 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.gson.stream.JsonWriter;
-
/**
* This is a run length encoded sparse data structure that maintains resource
* allocations over time.
@@ -44,12 +41,14 @@ public class RLESparseResourceAllocation {
private static final int THRESHOLD = 100;
private static final Resource ZERO_RESOURCE = Resources.none();
- private NavigableMap<Long, Resource> cumulativeCapacity =
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected NavigableMap<Long, Resource> cumulativeCapacity =
new TreeMap<Long, Resource>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
- private final Lock readLock = readWriteLock.readLock();
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final ResourceCalculator resourceCalculator;
@@ -236,34 +235,6 @@ public class RLESparseResourceAllocation {
}
/**
- * Returns the JSON string representation of the current resources allocated
- * over time.
- *
- * @return the JSON string representation of the current resources allocated
- * over time
- */
- public String toMemJSONString() {
- StringWriter json = new StringWriter();
- JsonWriter jsonWriter = new JsonWriter(json);
- readLock.lock();
- try {
- jsonWriter.beginObject();
- // jsonWriter.name("timestamp").value("resource");
- for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
- jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
- }
- jsonWriter.endObject();
- jsonWriter.close();
- return json.toString();
- } catch (IOException e) {
- // This should not happen
- return "";
- } finally {
- readLock.unlock();
- }
- }
-
- /**
* Returns the representation of the current resources allocated over time as
* an interval map (in the defined non-null range).
*
@@ -304,7 +275,7 @@ public class RLESparseResourceAllocation {
public NavigableMap<Long, Resource> getCumulative() {
readLock.lock();
try {
- return cumulativeCapacity;
+ return Collections.unmodifiableNavigableMap(cumulativeCapacity);
} finally {
readLock.unlock();
}
@@ -437,8 +408,8 @@ public class RLESparseResourceAllocation {
Resource val = Resources.negate(e.getValue());
// test for negative value and throws
if (operator == RLEOperator.subtractTestNonNegative
- && (Resources.fitsIn(val, ZERO_RESOURCE) &&
- !Resources.equals(val, ZERO_RESOURCE))) {
+ && (Resources.fitsIn(val, ZERO_RESOURCE)
+ && !Resources.equals(val, ZERO_RESOURCE))) {
throw new PlanningException(
"RLESparseResourceAllocation: merge failed as the "
+ "resulting RLESparseResourceAllocation would be negative");
@@ -504,22 +475,29 @@ public class RLESparseResourceAllocation {
}
+ /**
+ * Get a {@link RLESparseResourceAllocation} view of the {@link Resource}
+ * allocations between the specified start and end times.
+ *
+ * @param start the time from which the {@link Resource} allocations are
+ * required
+ * @param end the time upto which the {@link Resource} allocations are
+ * required
+ * @return the overlapping allocations
+ */
public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
readLock.lock();
try {
NavigableMap<Long, Resource> a = this.getCumulative();
-
if (a != null && !a.isEmpty()) {
// include the portion of previous entry that overlaps start
if (start > a.firstKey()) {
long previous = a.floorKey(start);
a = a.tailMap(previous, true);
}
-
if (end < a.lastKey()) {
a = a.headMap(end, true);
}
-
}
RLESparseResourceAllocation ret =
new RLESparseResourceAllocation(a, resourceCalculator);
@@ -527,7 +505,33 @@ public class RLESparseResourceAllocation {
} finally {
readLock.unlock();
}
+ }
+ /**
+ * This method shifts all the timestamp of the {@link Resource} entries by the
+ * specified "delta".
+ *
+ * @param delta the time by which to shift the {@link Resource} allocations
+ */
+ public void shift(long delta) {
+ writeLock.lock();
+ try {
+ TreeMap<Long, Resource> newCum = new TreeMap<>();
+ long start;
+ for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+ if (delta > 0) {
+ start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE
+ : entry.getKey() + delta;
+ } else {
+ start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE
+ : entry.getKey() + delta;
+ }
+ newCum.put(start, entry.getValue());
+ }
+ cumulativeCapacity = newCum;
+ } finally {
+ writeLock.unlock();
+ }
}
/**
@@ -541,8 +545,8 @@ public class RLESparseResourceAllocation {
/**
* Get the maximum capacity across specified time instances. The search-space
* is specified using the starting value, tick, and the periodic interval for
- * search. Maximum resource allocation across tick, tick + period,
- * tick + 2 * period,..., tick + n * period .. is returned.
+ * search. Maximum resource allocation across tick, tick + period, tick + 2 *
+ * period,..., tick + n * period .. is returned.
*
* @param tick the starting time instance
* @param period interval at which capacity is evaluated
@@ -550,14 +554,19 @@ public class RLESparseResourceAllocation {
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxCapacity = ZERO_RESOURCE;
- if (!cumulativeCapacity.isEmpty()) {
- Long lastKey = cumulativeCapacity.lastKey();
- for (long t = tick; t <= lastKey; t = t + period) {
- maxCapacity = Resources.componentwiseMax(maxCapacity,
- cumulativeCapacity.floorEntry(t).getValue());
+ readLock.lock();
+ try {
+ if (!cumulativeCapacity.isEmpty()) {
+ Long lastKey = cumulativeCapacity.lastKey();
+ for (long t = tick; t <= lastKey; t = t + period) {
+ maxCapacity = Resources.componentwiseMax(maxCapacity,
+ cumulativeCapacity.floorEntry(t).getValue());
+ }
}
+ return maxCapacity;
+ } finally {
+ readLock.unlock();
}
- return maxCapacity;
}
/**
@@ -567,17 +576,17 @@ public class RLESparseResourceAllocation {
* @return minimum resource allocation
*/
public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
- Resource minCapacity = Resource.newInstance(
- Integer.MAX_VALUE, Integer.MAX_VALUE);
+ Resource minCapacity =
+ Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE);
long start = interval.getStartTime();
long end = interval.getEndTime();
NavigableMap<Long, Resource> capacityRange =
- this.getRangeOverlapping(start, end).getCumulative();
+ getRangeOverlapping(start, end).getCumulative();
if (!capacityRange.isEmpty()) {
for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
if (entry.getValue() != null) {
- minCapacity = Resources.componentwiseMin(minCapacity,
- entry.getValue());
+ minCapacity =
+ Resources.componentwiseMin(minCapacity, entry.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
index 0da95ac..bb4a7fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -24,14 +24,16 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* A ReservationAllocation represents a concrete allocation of resources over
* time that satisfy a certain {@link ReservationDefinition}. This is used
* internally by a {@link Plan} to store information about how each of the
* accepted {@link ReservationDefinition} have been allocated.
*/
-public interface ReservationAllocation extends
- Comparable<ReservationAllocation> {
+public interface ReservationAllocation
+ extends Comparable<ReservationAllocation> {
/**
* Returns the unique identifier {@link ReservationId} that represents the
@@ -40,28 +42,28 @@ public interface ReservationAllocation extends
* @return reservationId the unique identifier {@link ReservationId} that
* represents the reservation
*/
- public ReservationId getReservationId();
+ ReservationId getReservationId();
/**
* Returns the original {@link ReservationDefinition} submitted by the client
*
* @return the {@link ReservationDefinition} submitted by the client
*/
- public ReservationDefinition getReservationDefinition();
+ ReservationDefinition getReservationDefinition();
/**
* Returns the time at which the reservation is activated.
*
* @return the time at which the reservation is activated
*/
- public long getStartTime();
+ long getStartTime();
/**
* Returns the time at which the reservation terminates.
*
* @return the time at which the reservation terminates
*/
- public long getEndTime();
+ long getEndTime();
/**
* Returns the map of resources requested against the time interval for which
@@ -70,28 +72,28 @@ public interface ReservationAllocation extends
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
*/
- public Map<ReservationInterval, Resource> getAllocationRequests();
+ Map<ReservationInterval, Resource> getAllocationRequests();
/**
* Return a string identifying the plan to which the reservation belongs
*
* @return the plan to which the reservation belongs
*/
- public String getPlanName();
+ String getPlanName();
/**
* Returns the user who requested the reservation
*
* @return the user who requested the reservation
*/
- public String getUser();
+ String getUser();
/**
* Returns whether the reservation has gang semantics or not
*
* @return true if there is a gang request, false otherwise
*/
- public boolean containsGangs();
+ boolean containsGangs();
/**
* Sets the time at which the reservation was accepted by the system
@@ -99,14 +101,14 @@ public interface ReservationAllocation extends
* @param acceptedAt the time at which the reservation was accepted by the
* system
*/
- public void setAcceptanceTimestamp(long acceptedAt);
+ void setAcceptanceTimestamp(long acceptedAt);
/**
* Returns the time at which the reservation was accepted by the system
*
* @return the time at which the reservation was accepted by the system
*/
- public long getAcceptanceTime();
+ long getAcceptanceTime();
/**
* Returns the capacity represented by cumulative resources reserved by the
@@ -116,12 +118,42 @@ public interface ReservationAllocation extends
* requested
* @return the resources reserved at the specified time
*/
- public Resource getResourcesAtTime(long tick);
+ Resource getResourcesAtTime(long tick);
+
+ /**
+ * Return a RLE representation of used resources.
+ *
+ * @return a RLE encoding of resources allocated over time.
+ */
+ RLESparseResourceAllocation getResourcesOverTime();
+
/**
* Return a RLE representation of used resources.
+ *
+ * @param start start of the time interval.
+ * @param end end of the time interval.
* @return a RLE encoding of resources allocated over time.
*/
- public RLESparseResourceAllocation getResourcesOverTime();
+ RLESparseResourceAllocation getResourcesOverTime(long start, long end);
+
+ /**
+ * Get the periodicity of this reservation representing the time period of the
+ * periodic job. Period is represented in milliseconds for periodic jobs.
+ * Period is 0 for non-periodic jobs.
+ *
+ * @return periodicity of this reservation
+ */
+ long getPeriodicity();
+
+ /**
+ * Set the periodicity of this reservation representing the time period of the
+ * periodic job. Period is represented in milliseconds for periodic jobs.
+ * Period is 0 for non-periodic jobs.
+ *
+ * @param period periodicity of this reservation
+ */
+ @VisibleForTesting
+ void setPeriodicity(long period);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
index 027d066..a66d222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
@@ -44,6 +44,8 @@ public class ReservationInputValidator {
/**
* Utility class to validate reservation requests.
+ *
+ * @param clock the {@link Clock} to use
*/
public ReservationInputValidator(Clock clock) {
this.clock = clock;
@@ -53,22 +55,21 @@ public class ReservationInputValidator {
ReservationId reservationId, String auditConstant) throws YarnException {
// check if the reservation id is valid
if (reservationId == null) {
- String message =
- "Missing reservation id."
- + " Please try again by specifying a reservation id.";
+ String message = "Missing reservation id."
+ + " Please try again by specifying a reservation id.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
String queue = reservationSystem.getQueueForReservation(reservationId);
String nullQueueErrorMessage =
- "The specified reservation with ID: " + reservationId
- + " is unknown. Please try again with a valid reservation.";
+ "The specified reservation with ID: " + reservationId
+ + " is unknown. Please try again with a valid reservation.";
String nullPlanErrorMessage = "The specified reservation: " + reservationId
- + " is not associated with any valid plan."
- + " Please try again with a valid reservation.";
+ + " is not associated with any valid plan."
+ + " Please try again with a valid reservation.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
- nullQueueErrorMessage, nullPlanErrorMessage);
+ nullQueueErrorMessage, nullPlanErrorMessage);
}
private void validateReservationDefinition(ReservationId reservationId,
@@ -77,17 +78,15 @@ public class ReservationInputValidator {
String message = "";
// check if deadline is in the past
if (contract == null) {
- message =
- "Missing reservation definition."
- + " Please try again by specifying a reservation definition.";
+ message = "Missing reservation definition."
+ + " Please try again by specifying a reservation definition.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
if (contract.getDeadline() <= clock.getTime()) {
- message =
- "The specified deadline: " + contract.getDeadline()
- + " is the past. Please try again with deadline in the future.";
+ message = "The specified deadline: " + contract.getDeadline()
+ + " is the past. Please try again with deadline in the future.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -95,18 +94,16 @@ public class ReservationInputValidator {
// Check if at least one RR has been specified
ReservationRequests resReqs = contract.getReservationRequests();
if (resReqs == null) {
- message =
- "No resources have been specified to reserve."
- + "Please try again by specifying the resources to reserve.";
+ message = "No resources have been specified to reserve."
+ + "Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
List<ReservationRequest> resReq = resReqs.getReservationResources();
if (resReq == null || resReq.isEmpty()) {
- message =
- "No resources have been specified to reserve."
- + " Please try again by specifying the resources to reserve.";
+ message = "No resources have been specified to reserve."
+ + " Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -123,22 +120,18 @@ public class ReservationInputValidator {
} else {
minDuration += rr.getDuration();
}
- maxGangSize =
- Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
- maxGangSize,
- Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+ maxGangSize = Resources.max(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), maxGangSize,
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()));
}
// verify the allocation is possible (skip for ANY)
long duration = contract.getDeadline() - contract.getArrival();
- if (duration < minDuration
- && type != ReservationRequestInterpreter.R_ANY) {
- message =
- "The time difference ("
- + (duration)
- + ") between arrival (" + contract.getArrival() + ") "
- + "and deadline (" + contract.getDeadline() + ") must "
- + " be greater or equal to the minimum resource duration ("
- + minDuration + ")";
+ if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) {
+ message = "The time difference (" + (duration) + ") between arrival ("
+ + contract.getArrival() + ") " + "and deadline ("
+ + contract.getDeadline() + ") must "
+ + " be greater or equal to the minimum resource duration ("
+ + minDuration + ")";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -148,10 +141,9 @@ public class ReservationInputValidator {
if (Resources.greaterThan(plan.getResourceCalculator(),
plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
&& type != ReservationRequestInterpreter.R_ANY) {
- message =
- "The size of the largest gang in the reservation definition ("
- + maxGangSize + ") exceed the capacity available ("
- + plan.getTotalCapacity() + " )";
+ message = "The size of the largest gang in the reservation definition ("
+ + maxGangSize + ") exceed the capacity available ("
+ + plan.getTotalCapacity() + " )";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
@@ -179,32 +171,32 @@ public class ReservationInputValidator {
}
}
- private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
- queue, String auditConstant) throws YarnException {
+ private Plan getPlanFromQueue(ReservationSystem reservationSystem,
+ String queue, String auditConstant) throws YarnException {
String nullQueueErrorMessage = "The queue is not specified."
- + " Please try again with a valid reservable queue.";
+ + " Please try again with a valid reservable queue.";
String nullPlanErrorMessage = "The specified queue: " + queue
- + " is not managed by reservation system."
- + " Please try again with a valid reservable queue.";
+ + " is not managed by reservation system."
+ + " Please try again with a valid reservable queue.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
- nullQueueErrorMessage, nullPlanErrorMessage);
+ nullQueueErrorMessage, nullPlanErrorMessage);
}
- private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
- queue, String auditConstant, String nullQueueErrorMessage,
- String nullPlanErrorMessage) throws YarnException {
+ private Plan getPlanFromQueue(ReservationSystem reservationSystem,
+ String queue, String auditConstant, String nullQueueErrorMessage,
+ String nullPlanErrorMessage) throws YarnException {
if (queue == null || queue.isEmpty()) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
- "validate reservation input", "ClientRMService",
- nullQueueErrorMessage);
+ "validate reservation input", "ClientRMService",
+ nullQueueErrorMessage);
throw RPCUtil.getRemoteException(nullQueueErrorMessage);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queue);
if (plan == null) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
- "validate reservation input", "ClientRMService",
- nullPlanErrorMessage);
+ "validate reservation input", "ClientRMService",
+ nullPlanErrorMessage);
throw RPCUtil.getRemoteException(nullPlanErrorMessage);
}
return plan;
@@ -222,22 +214,21 @@ public class ReservationInputValidator {
* @param reservationId the {@link ReservationId} associated with the current
* request
* @return the {@link Plan} to submit the request to
- * @throws YarnException
+ * @throws YarnException if validation fails
*/
public Plan validateReservationSubmissionRequest(
- ReservationSystem reservationSystem,
- ReservationSubmissionRequest request, ReservationId reservationId)
- throws YarnException {
+ ReservationSystem reservationSystem, ReservationSubmissionRequest request,
+ ReservationId reservationId) throws YarnException {
String message;
if (reservationId == null) {
- message = "Reservation id cannot be null. Please try again " +
- "specifying a valid reservation id by creating a new reservation id.";
+ message = "Reservation id cannot be null. Please try again specifying "
+ + " a valid reservation id by creating a new reservation id.";
throw RPCUtil.getRemoteException(message);
}
// Check if it is a managed queue
String queue = request.getQueue();
Plan plan = getPlanFromQueue(reservationSystem, queue,
- AuditConstants.SUBMIT_RESERVATION_REQUEST);
+ AuditConstants.SUBMIT_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
@@ -255,15 +246,14 @@ public class ReservationInputValidator {
* @param request the {@link ReservationUpdateRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
- * @throws YarnException
+ * @throws YarnException if validation fails
*/
public Plan validateReservationUpdateRequest(
ReservationSystem reservationSystem, ReservationUpdateRequest request)
throws YarnException {
ReservationId reservationId = request.getReservationId();
- Plan plan =
- validateReservation(reservationSystem, reservationId,
- AuditConstants.UPDATE_RESERVATION_REQUEST);
+ Plan plan = validateReservation(reservationSystem, reservationId,
+ AuditConstants.UPDATE_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.UPDATE_RESERVATION_REQUEST);
@@ -278,28 +268,26 @@ public class ReservationInputValidator {
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationListRequest} defining search
- * parameters for reservations in the {@link ReservationSystem}
- * that is being validated against.
+ * parameters for reservations in the {@link ReservationSystem} that
+ * is being validated against.
* @return the {@link Plan} to list reservations of.
- * @throws YarnException
+ * @throws YarnException if validation fails
*/
public Plan validateReservationListRequest(
- ReservationSystem reservationSystem,
- ReservationListRequest request)
+ ReservationSystem reservationSystem, ReservationListRequest request)
throws YarnException {
String queue = request.getQueue();
if (request.getEndTime() < request.getStartTime()) {
- String errorMessage = "The specified end time must be greater than " +
- "the specified start time.";
+ String errorMessage = "The specified end time must be greater than "
+ + "the specified start time.";
RMAuditLogger.logFailure("UNKNOWN",
- AuditConstants.LIST_RESERVATION_REQUEST,
- "validate list reservation input", "ClientRMService",
- errorMessage);
+ AuditConstants.LIST_RESERVATION_REQUEST,
+ "validate list reservation input", "ClientRMService", errorMessage);
throw RPCUtil.getRemoteException(errorMessage);
}
// Check if it is a managed queue
return getPlanFromQueue(reservationSystem, queue,
- AuditConstants.LIST_RESERVATION_REQUEST);
+ AuditConstants.LIST_RESERVATION_REQUEST);
}
/**
@@ -312,7 +300,7 @@ public class ReservationInputValidator {
* @param request the {@link ReservationDeleteRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
- * @throws YarnException
+ * @throws YarnException if validation fails
*/
public Plan validateReservationDeleteRequest(
ReservationSystem reservationSystem, ReservationDeleteRequest request)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index 8b62972..a6c8fcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import java.util.Map;
+
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -29,8 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
-import java.util.Map;
-
/**
* This interface is the one implemented by any system that wants to support
* Reservations i.e. make {@code Resource} allocations in future. Implementors
@@ -57,7 +57,7 @@ public interface ReservationSystem extends Recoverable {
*
* @param conf configuration
* @param rmContext current context of the {@code ResourceManager}
- * @throws YarnException
+ * @throws YarnException if initialization of the configured plan fails
*/
void reinitialize(Configuration conf, RMContext rmContext)
throws YarnException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
index e458055..cbf0f38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java
@@ -38,7 +38,7 @@ public interface SharingPolicy {
* @param planQueuePath the name of the queue for this plan
* @param conf the system configuration
*/
- public void init(String planQueuePath, ReservationSchedulerConfiguration conf);
+ void init(String planQueuePath, ReservationSchedulerConfiguration conf);
/**
* This method runs the policy validation logic, and return true/false on
@@ -51,7 +51,7 @@ public interface SharingPolicy {
* @throws PlanningException if the policy is respected if we add this
* {@link ReservationAllocation} to the {@link Plan}
*/
- public void validate(Plan plan, ReservationAllocation newAllocation)
+ void validate(Plan plan, ReservationAllocation newAllocation)
throws PlanningException;
/**
@@ -68,9 +68,13 @@ public interface SharingPolicy {
* @param start the start time for the range we are querying
* @param end the end time for the range we are querying
* @param oldId (optional) the id of a reservation being updated
+ *
+ * @return the available resources expressed as a
+ * {@link RLESparseResourceAllocation}
+ *
* @throws PlanningException throws if the request is not valid
*/
- public RLESparseResourceAllocation availableResources(
+ RLESparseResourceAllocation availableResources(
RLESparseResourceAllocation available, Plan plan, String user,
ReservationId oldId, long start, long end) throws PlanningException;
@@ -82,7 +86,6 @@ public interface SharingPolicy {
*
* @return validWindow the window of validity considered by the policy.
*/
- public long getValidWindow();
-
+ long getValidWindow();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
index abac6ac..af0e712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -34,7 +34,7 @@ public interface Planner {
*
* @param plan the {@link Plan} to replan
* @param contracts the list of reservation requests
- * @throws PlanningException
+ * @throws PlanningException if operation is unsuccessful
*/
public void plan(Plan plan, List<ReservationDefinition> contracts)
throws PlanningException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index 199bfa5..bbbf0d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -50,7 +50,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
* @return whether the allocateUser function was successful or not
*
* @throws PlanningException if the session cannot be fitted into the plan
- * @throws ContractValidationException
+ * @throws ContractValidationException if validation fails
*/
protected boolean allocateUser(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index ec6d9c0..8934b0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -50,7 +50,7 @@ public interface StageAllocator {
*
* @return The computed allocation (or null if the stage could not be
* allocated)
- * @throws PlanningException
+ * @throws PlanningException if operation is unsuccessful
*/
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index da04336..d107487 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
RLESparseResourceAllocation netAvailable =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
- stageDeadline);
+ stageDeadline, 0);
netAvailable =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index ec83e02..ae7d91a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -83,9 +83,8 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
// get available resources from plan
- RLESparseResourceAllocation netRLERes =
- plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
- stageDeadline);
+ RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
+ user, oldId, stageEarliestStart, stageDeadline, 0);
// remove plan modifications
netRLERes =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index e45f58c..c014549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -77,8 +77,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity();
- RLESparseResourceAllocation netRLERes = plan
- .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
+ RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
+ user, oldId, stageArrival, stageDeadline, 0);
long step = plan.getStep();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index e99842e..5337e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anySetOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.FileWriter;
import java.io.IOException;
@@ -76,7 +79,8 @@ public class ReservationSystemTestUtil {
String reservationQ, long timeWindow, float instConstraint,
float avgConstraint) {
- ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
+ ReservationSchedulerConfiguration realConf =
+ new CapacitySchedulerConfiguration();
ReservationSchedulerConfiguration conf = spy(realConf);
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
when(conf.getInstantaneousMaxCapacity(reservationQ))
@@ -168,7 +172,6 @@ public class ReservationSystemTestUtil {
scheduler.start();
scheduler.reinitialize(conf, rmContext);
-
Resource resource =
ReservationSystemTestUtil.calculateClusterResource(numContainers);
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
@@ -184,10 +187,16 @@ public class ReservationSystemTestUtil {
public static ReservationDefinition createSimpleReservationDefinition(
long arrival, long deadline, long duration, int parallelism) {
+ return createSimpleReservationDefinition(arrival, deadline, duration,
+ parallelism, null);
+ }
+
+ public static ReservationDefinition createSimpleReservationDefinition(
+ long arrival, long deadline, long duration, int parallelism,
+ String recurrenceExpression) {
// create a request with a single atomic ask
- ReservationRequest r =
- ReservationRequest.newInstance(Resource.newInstance(1024, 1),
- parallelism, parallelism, duration);
+ ReservationRequest r = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), parallelism, parallelism, duration);
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
@@ -195,32 +204,31 @@ public class ReservationSystemTestUtil {
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
+ if (recurrenceExpression != null) {
+ rDef.setRecurrenceExpression(recurrenceExpression);
+ }
return rDef;
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration) {
- return createSimpleReservationRequest(reservationId, numContainers,
- arrival, deadline, duration, Priority.UNDEFINED);
+ return createSimpleReservationRequest(reservationId, numContainers, arrival,
+ deadline, duration, Priority.UNDEFINED);
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration, Priority priority) {
// create a request with a single atomic ask
- ReservationRequest r =
- ReservationRequest.newInstance(Resource.newInstance(1024, 1),
- numContainers, 1, duration);
- ReservationRequests reqs =
- ReservationRequests.newInstance(Collections.singletonList(r),
- ReservationRequestInterpreter.R_ALL);
- ReservationDefinition rDef =
- ReservationDefinition.newInstance(arrival, deadline, reqs,
- "testClientRMService#reservation", "0", priority);
- ReservationSubmissionRequest request =
- ReservationSubmissionRequest.newInstance(rDef,
- reservationQ, reservationId);
+ ReservationRequest r = ReservationRequest
+ .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
+ ReservationRequests reqs = ReservationRequests.newInstance(
+ Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+ deadline, reqs, "testClientRMService#reservation", "0", priority);
+ ReservationSubmissionRequest request = ReservationSubmissionRequest
+ .newInstance(rDef, reservationQ, reservationId);
return request;
}
@@ -252,9 +260,9 @@ public class ReservationSystemTestUtil {
return cs;
}
- @SuppressWarnings("rawtypes") public static void initializeRMContext(
- int numContainers, AbstractYarnScheduler scheduler,
- RMContext mockRMContext) {
+ @SuppressWarnings("rawtypes")
+ public static void initializeRMContext(int numContainers,
+ AbstractYarnScheduler scheduler, RMContext mockRMContext) {
when(mockRMContext.getScheduler()).thenReturn(scheduler);
Resource r = calculateClusterResource(numContainers);
@@ -262,26 +270,25 @@ public class ReservationSystemTestUtil {
}
public static RMContext createRMContext(Configuration conf) {
- RMContext mockRmContext = Mockito.spy(
- new RMContextImpl(null, null, null, null, null, null,
- new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
+ RMContext mockRmContext = Mockito.spy(new RMContextImpl(null, null, null,
+ null, null, null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null));
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when(nlm.getQueueResource(any(String.class), anySetOf(String.class),
- any(Resource.class))).thenAnswer(new Answer<Resource>() {
- @Override public Resource answer(InvocationOnMock invocation)
- throws Throwable {
- Object[] args = invocation.getArguments();
- return (Resource) args[2];
- }
- });
+ any(Resource.class))).thenAnswer(new Answer<Resource>() {
+ @Override
+ public Resource answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ return (Resource) args[2];
+ }
+ });
when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
.thenAnswer(new Answer<Resource>() {
- @Override public Resource answer(InvocationOnMock invocation)
- throws Throwable {
+ @Override
+ public Resource answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return (Resource) args[1];
}
@@ -304,9 +311,8 @@ public class ReservationSystemTestUtil {
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
- final String dedicated =
- CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT
- + reservationQ;
+ final String dedicated = CapacitySchedulerConfiguration.ROOT
+ + CapacitySchedulerConfiguration.DOT + reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservable(dedicated, true);
@@ -405,26 +411,55 @@ public class ReservationSystemTestUtil {
public static Map<ReservationInterval, Resource> generateAllocation(
long startTime, long step, int[] alloc) {
+ return generateAllocation(startTime, step, alloc, null);
+ }
+
+ public static Map<ReservationInterval, Resource> generateAllocation(
+ long startTime, long step, int[] alloc, String recurrenceExpression) {
Map<ReservationInterval, Resource> req = new TreeMap<>();
- for (int i = 0; i < alloc.length; i++) {
- req.put(new ReservationInterval(startTime + i * step,
- startTime + (i + 1) * step), ReservationSystemUtil.toResource(
- ReservationRequest
- .newInstance(Resource.newInstance(1024, 1), alloc[i])));
+
+ long period = 0;
+ if (recurrenceExpression != null) {
+ period = Long.parseLong(recurrenceExpression);
+ }
+
+ long rStart;
+ long rEnd;
+ for (int j = 0; j < 86400000; j += period) {
+ for (int i = 0; i < alloc.length; i++) {
+ rStart = (startTime + i * step) + j * period;
+ rEnd = (startTime + (i + 1) * step) + j * period;
+ if (period > 0) {
+ rStart = rStart % period + j * period;
+ rEnd = rEnd % period + j * period;
+ if (rStart > rEnd) {
+ // skip wrap-around entry
+ continue;
+ }
+ }
+
+ req.put(new ReservationInterval(rStart, rEnd),
+ ReservationSystemUtil.toResource(ReservationRequest
+ .newInstance(Resource.newInstance(1024, 1), alloc[i])));
+
+ }
+ // execute only once if non-periodic
+ if (period == 0) {
+ break;
+ }
}
return req;
}
- public static RLESparseResourceAllocation
- generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) {
+ public static RLESparseResourceAllocation generateRLESparseResourceAllocation(
+ int[] alloc, long[] timeSteps) {
TreeMap<Long, Resource> allocationsMap = new TreeMap<>();
for (int i = 0; i < alloc.length; i++) {
allocationsMap.put(timeSteps[i],
Resource.newInstance(alloc[i], alloc[i]));
}
- RLESparseResourceAllocation rleVector =
- new RLESparseResourceAllocation(allocationsMap,
- new DefaultResourceCalculator());
+ RLESparseResourceAllocation rleVector = new RLESparseResourceAllocation(
+ allocationsMap, new DefaultResourceCalculator());
return rleVector;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org