You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:37 UTC
[22/33] git commit: YARN-1709. In-memory data structures used to
track resources over time to enable reservations. (cherry picked from commit
0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3) (cherry picked from commit
cf4b34282aafee9f6b09d3433c4de1ae4b359168) (
YARN-1709. In-memory data structures used to track resources over time to enable reservations.
(cherry picked from commit 0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3)
(cherry picked from commit cf4b34282aafee9f6b09d3433c4de1ae4b359168)
(cherry picked from commit 63250ef9d6410f5db20dfe421870c655e2c0fd65)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/056b7f57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/056b7f57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/056b7f57
Branch: refs/heads/branch-2.6
Commit: 056b7f5799dc8ff98f534cd833b173bf004a1556
Parents: f4522fd
Author: subru <su...@outlook.com>
Authored: Fri Sep 12 17:22:08 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:29:13 2014 -0700
----------------------------------------------------------------------
YARN-1051-CHANGES.txt | 3 +
.../reservation/InMemoryPlan.java | 507 +++++++++++++++++++
.../InMemoryReservationAllocation.java | 151 ++++++
.../resourcemanager/reservation/Plan.java | 32 ++
.../reservation/PlanContext.java | 101 ++++
.../resourcemanager/reservation/PlanEdit.java | 61 +++
.../resourcemanager/reservation/PlanView.java | 89 ++++
.../RLESparseResourceAllocation.java | 332 ++++++++++++
.../reservation/ReservationAllocation.java | 104 ++++
.../reservation/ReservationInterval.java | 67 +++
.../exceptions/PlanningException.java | 25 +
.../reservation/ReservationSystemTestUtil.java | 210 ++++++++
.../reservation/TestInMemoryPlan.java | 477 +++++++++++++++++
.../TestInMemoryReservationAllocation.java | 206 ++++++++
.../TestRLESparseResourceAllocation.java | 169 +++++++
15 files changed, 2534 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index a7c08a0..410d974 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
+
+YARN-1709. In-memory data structures used to track resources over time to
+enable reservations. (subru)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
new file mode 100644
index 0000000..99231c4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -0,0 +1,507 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryPlan implements Plan {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
+
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+ private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
+ new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
+
+ private RLESparseResourceAllocation rleSparseVector;
+
+ private Map<String, RLESparseResourceAllocation> userResourceAlloc =
+ new HashMap<String, RLESparseResourceAllocation>();
+
+ private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
+ new HashMap<ReservationId, InMemoryReservationAllocation>();
+
+ private final ReentrantReadWriteLock readWriteLock =
+ new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+ private final SharingPolicy policy;
+ private final ReservationAgent agent;
+ private final long step;
+ private final ResourceCalculator resCalc;
+ private final Resource minAlloc, maxAlloc;
+ private final String queueName;
+ private final QueueMetrics queueMetrics;
+ private final Planner replanner;
+ private final boolean getMoveOnExpiry;
+ private final Clock clock;
+
+ private Resource totalCapacity;
+
+ InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ ReservationAgent agent, Resource totalCapacity, long step,
+ ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+ String queueName, Planner replanner, boolean getMoveOnExpiry) {
+ this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
+ maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
+ }
+
+ InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ ReservationAgent agent, Resource totalCapacity, long step,
+ ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+ String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
+ this.queueMetrics = queueMetrics;
+ this.policy = policy;
+ this.agent = agent;
+ this.step = step;
+ this.totalCapacity = totalCapacity;
+ this.resCalc = resCalc;
+ this.minAlloc = minAlloc;
+ this.maxAlloc = maxAlloc;
+ this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
+ this.queueName = queueName;
+ this.replanner = replanner;
+ this.getMoveOnExpiry = getMoveOnExpiry;
+ this.clock = clock;
+ }
+
+ @Override
+ public QueueMetrics getQueueMetrics() {
+ return queueMetrics;
+ }
+
+ private void incrementAllocation(ReservationAllocation reservation) {
+ assert (readWriteLock.isWriteLockedByCurrentThread());
+ Map<ReservationInterval, ReservationRequest> allocationRequests =
+ reservation.getAllocationRequests();
+ // check if we have encountered the user earlier and if not add an entry
+ String user = reservation.getUser();
+ RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+ if (resAlloc == null) {
+ resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
+ userResourceAlloc.put(user, resAlloc);
+ }
+ for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+ .entrySet()) {
+ resAlloc.addInterval(r.getKey(), r.getValue());
+ rleSparseVector.addInterval(r.getKey(), r.getValue());
+ }
+ }
+
+ private void decrementAllocation(ReservationAllocation reservation) {
+ assert (readWriteLock.isWriteLockedByCurrentThread());
+ Map<ReservationInterval, ReservationRequest> allocationRequests =
+ reservation.getAllocationRequests();
+ String user = reservation.getUser();
+ RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+ for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+ .entrySet()) {
+ resAlloc.removeInterval(r.getKey(), r.getValue());
+ rleSparseVector.removeInterval(r.getKey(), r.getValue());
+ }
+ if (resAlloc.isEmpty()) {
+ userResourceAlloc.remove(resAlloc);
+ }
+ }
+
+ public Set<ReservationAllocation> getAllReservations() {
+ readLock.lock();
+ try {
+ if (currentReservations != null) {
+ Set<ReservationAllocation> flattenedReservations =
+ new HashSet<ReservationAllocation>();
+ for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
+ .values()) {
+ flattenedReservations.addAll(reservationEntries);
+ }
+ return flattenedReservations;
+ } else {
+ return null;
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean addReservation(ReservationAllocation reservation)
+ throws PlanningException {
+ // Verify the allocation is memory based otherwise it is not supported
+ InMemoryReservationAllocation inMemReservation =
+ (InMemoryReservationAllocation) reservation;
+ if (inMemReservation.getUser() == null) {
+ String errMsg =
+ "The specified Reservation with ID "
+ + inMemReservation.getReservationId()
+ + " is not mapped to any user";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ writeLock.lock();
+ try {
+ if (reservationTable.containsKey(inMemReservation.getReservationId())) {
+ String errMsg =
+ "The specified Reservation with ID "
+ + inMemReservation.getReservationId() + " already exists";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ // Validate if we can accept this reservation, throws exception if
+ // validation fails
+ policy.validate(this, inMemReservation);
+ // we record here the time in which the allocation has been accepted
+ reservation.setAcceptanceTimestamp(clock.getTime());
+ ReservationInterval searchInterval =
+ new ReservationInterval(inMemReservation.getStartTime(),
+ inMemReservation.getEndTime());
+ Set<InMemoryReservationAllocation> reservations =
+ currentReservations.get(searchInterval);
+ if (reservations == null) {
+ reservations = new HashSet<InMemoryReservationAllocation>();
+ }
+ if (!reservations.add(inMemReservation)) {
+ LOG.error("Unable to add reservation: {} to plan.",
+ inMemReservation.getReservationId());
+ return false;
+ }
+ currentReservations.put(searchInterval, reservations);
+ reservationTable.put(inMemReservation.getReservationId(),
+ inMemReservation);
+ incrementAllocation(inMemReservation);
+ LOG.info("Sucessfully added reservation: {} to plan.",
+ inMemReservation.getReservationId());
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean updateReservation(ReservationAllocation reservation)
+ throws PlanningException {
+ writeLock.lock();
+ boolean result = false;
+ try {
+ ReservationId resId = reservation.getReservationId();
+ ReservationAllocation currReservation = getReservationById(resId);
+ if (currReservation == null) {
+ String errMsg =
+ "The specified Reservation with ID " + resId
+ + " does not exist in the plan";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ if (!removeReservation(currReservation)) {
+ LOG.error("Unable to replace reservation: {} from plan.",
+ reservation.getReservationId());
+ return result;
+ }
+ try {
+ result = addReservation(reservation);
+ } catch (PlanningException e) {
+ LOG.error("Unable to update reservation: {} from plan due to {}.",
+ reservation.getReservationId(), e.getMessage());
+ }
+ if (result) {
+ LOG.info("Sucessfully updated reservation: {} in plan.",
+ reservation.getReservationId());
+ return result;
+ } else {
+ // rollback delete
+ addReservation(currReservation);
+ LOG.info("Rollbacked update reservation: {} from plan.",
+ reservation.getReservationId());
+ return result;
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private boolean removeReservation(ReservationAllocation reservation) {
+ assert (readWriteLock.isWriteLockedByCurrentThread());
+ ReservationInterval searchInterval =
+ new ReservationInterval(reservation.getStartTime(),
+ reservation.getEndTime());
+ Set<InMemoryReservationAllocation> reservations =
+ currentReservations.get(searchInterval);
+ if (reservations != null) {
+ if (!reservations.remove(reservation)) {
+ LOG.error("Unable to remove reservation: {} from plan.",
+ reservation.getReservationId());
+ return false;
+ }
+ if (reservations.isEmpty()) {
+ currentReservations.remove(searchInterval);
+ }
+ } else {
+ String errMsg =
+ "The specified Reservation with ID " + reservation.getReservationId()
+ + " does not exist in the plan";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ reservationTable.remove(reservation.getReservationId());
+ decrementAllocation(reservation);
+ LOG.info("Sucessfully deleted reservation: {} in plan.",
+ reservation.getReservationId());
+ return true;
+ }
+
+ @Override
+ public boolean deleteReservation(ReservationId reservationID) {
+ writeLock.lock();
+ try {
+ ReservationAllocation reservation = getReservationById(reservationID);
+ if (reservation == null) {
+ String errMsg =
+ "The specified Reservation with ID " + reservationID
+ + " does not exist in the plan";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ return removeReservation(reservation);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void archiveCompletedReservations(long tick) {
+ // Since we are looking for old reservations, read lock is optimal
+ LOG.debug("Running archival at time: {}", tick);
+ readLock.lock();
+ List<InMemoryReservationAllocation> expiredReservations =
+ new ArrayList<InMemoryReservationAllocation>();
+ // archive reservations and delete the ones which are beyond
+ // the reservation policy "window"
+ try {
+ long archivalTime = tick - policy.getValidWindow();
+ ReservationInterval searchInterval =
+ new ReservationInterval(archivalTime, archivalTime);
+ SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
+ currentReservations.headMap(searchInterval, true);
+ if (!reservations.isEmpty()) {
+ for (Set<InMemoryReservationAllocation> reservationEntries : reservations
+ .values()) {
+ for (InMemoryReservationAllocation reservation : reservationEntries) {
+ if (reservation.getEndTime() <= archivalTime) {
+ expiredReservations.add(reservation);
+ }
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ if (expiredReservations.isEmpty()) {
+ return;
+ }
+ // Need write lock only if there are any reservations to be deleted
+ writeLock.lock();
+ try {
+ for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
+ removeReservation(expiredReservation);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<ReservationAllocation> getReservationsAtTime(long tick) {
+ readLock.lock();
+ ReservationInterval searchInterval =
+ new ReservationInterval(tick, Long.MAX_VALUE);
+ try {
+ SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
+ currentReservations.headMap(searchInterval, true);
+ if (!reservations.isEmpty()) {
+ Set<ReservationAllocation> flattenedReservations =
+ new HashSet<ReservationAllocation>();
+ for (Set<InMemoryReservationAllocation> reservationEntries : reservations
+ .values()) {
+ for (InMemoryReservationAllocation reservation : reservationEntries) {
+ if (reservation.getEndTime() > tick) {
+ flattenedReservations.add(reservation);
+ }
+ }
+ }
+ return Collections.unmodifiableSet(flattenedReservations);
+ } else {
+ return Collections.emptySet();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getStep() {
+ return step;
+ }
+
+ @Override
+ public SharingPolicy getSharingPolicy() {
+ return policy;
+ }
+
+ @Override
+ public ReservationAgent getReservationAgent() {
+ return agent;
+ }
+
+ @Override
+ public Resource getConsumptionForUser(String user, long t) {
+ readLock.lock();
+ try {
+ RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+ if (userResAlloc != null) {
+ return userResAlloc.getCapacityAtTime(t);
+ } else {
+ return Resources.clone(ZERO_RESOURCE);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Resource getTotalCommittedResources(long t) {
+ readLock.lock();
+ try {
+ return rleSparseVector.getCapacityAtTime(t);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ReservationAllocation getReservationById(ReservationId reservationID) {
+ if (reservationID == null) {
+ return null;
+ }
+ readLock.lock();
+ try {
+ return reservationTable.get(reservationID);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Resource getTotalCapacity() {
+ readLock.lock();
+ try {
+ return Resources.clone(totalCapacity);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Resource getMinimumAllocation() {
+ return Resources.clone(minAlloc);
+ }
+
+ @Override
+ public void setTotalCapacity(Resource cap) {
+ writeLock.lock();
+ try {
+ totalCapacity = Resources.clone(cap);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public long getEarliestStartTime() {
+ readLock.lock();
+ try {
+ return rleSparseVector.getEarliestStartTime();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastEndTime() {
+ readLock.lock();
+ try {
+ return rleSparseVector.getLatestEndTime();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ResourceCalculator getResourceCalculator() {
+ return resCalc;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public Resource getMaximumAllocation() {
+ return Resources.clone(maxAlloc);
+ }
+
+ public String toCumulativeString() {
+ readLock.lock();
+ try {
+ return rleSparseVector.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Planner getReplanner() {
+ return replanner;
+ }
+
+ @Override
+ public boolean getMoveOnExpiry() {
+ return getMoveOnExpiry;
+ }
+
+ @Override
+ public String toString() {
+ readLock.lock();
+ try {
+ StringBuffer planStr = new StringBuffer("In-memory Plan: ");
+ planStr.append("Parent Queue: ").append(queueName)
+ .append("Total Capacity: ").append(totalCapacity).append("Step: ")
+ .append(step);
+ for (ReservationAllocation reservation : getAllReservations()) {
+ planStr.append(reservation);
+ }
+ return planStr.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
new file mode 100644
index 0000000..10cc55f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -0,0 +1,151 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * An in memory implementation of a reservation allocation using the
+ * {@link RLESparseResourceAllocation}
+ *
+ */
+class InMemoryReservationAllocation implements ReservationAllocation {
+
+ private final String planName;
+ private final ReservationId reservationID;
+ private final String user;
+ private final ReservationDefinition contract;
+ private final long startTime;
+ private final long endTime;
+ private final Map<ReservationInterval, ReservationRequest> allocationRequests;
+ private boolean hasGang = false;
+ private long acceptedAt = -1;
+
+ private RLESparseResourceAllocation resourcesOverTime;
+
+ InMemoryReservationAllocation(ReservationId reservationID,
+ ReservationDefinition contract, String user, String planName,
+ long startTime, long endTime,
+ Map<ReservationInterval, ReservationRequest> allocationRequests,
+ ResourceCalculator calculator, Resource minAlloc) {
+ this.contract = contract;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.reservationID = reservationID;
+ this.user = user;
+ this.allocationRequests = allocationRequests;
+ this.planName = planName;
+ resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
+ for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
+ .entrySet()) {
+ resourcesOverTime.addInterval(r.getKey(), r.getValue());
+ if (r.getValue().getConcurrency() > 1) {
+ hasGang = true;
+ }
+ }
+ }
+
+ @Override
+ public ReservationId getReservationId() {
+ return reservationID;
+ }
+
+ @Override
+ public ReservationDefinition getReservationDefinition() {
+ return contract;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public long getEndTime() {
+ return endTime;
+ }
+
+ @Override
+ public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
+ return Collections.unmodifiableMap(allocationRequests);
+ }
+
+ @Override
+ public String getPlanName() {
+ return planName;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public boolean containsGangs() {
+ return hasGang;
+ }
+
+ @Override
+ public void setAcceptanceTimestamp(long acceptedAt) {
+ this.acceptedAt = acceptedAt;
+ }
+
+ @Override
+ public long getAcceptanceTime() {
+ return acceptedAt;
+ }
+
+ @Override
+ public Resource getResourcesAtTime(long tick) {
+ if (tick < startTime || tick >= endTime) {
+ return Resource.newInstance(0, 0);
+ }
+ return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sBuf = new StringBuilder();
+ sBuf.append(getReservationId()).append(" user:").append(getUser())
+ .append(" startTime: ").append(getStartTime()).append(" endTime: ")
+ .append(getEndTime()).append(" alloc:[")
+ .append(resourcesOverTime.toString()).append("] ");
+ return sBuf.toString();
+ }
+
+ @Override
+ public int compareTo(ReservationAllocation other) {
+ // reverse order of acceptance
+ if (this.getAcceptanceTime() > other.getAcceptanceTime()) {
+ return -1;
+ }
+ if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
+ return 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return reservationID.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj;
+ return this.reservationID.equals(other.getReservationId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
new file mode 100644
index 0000000..cf2aed7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
@@ -0,0 +1,32 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+/**
+ * A Plan represents the central data structure of a reservation system that
+ * maintains the "agenda" for the cluster. In particular, it maintains
+ * information on how a set of {@link ReservationDefinition} that have been
+ * previously accepted will be honored.
+ *
+ * {@link ReservationDefinition} submitted by the users through the RM public
+ * APIs are passed to appropriate {@link ReservationAgent}s, which in turn will
+ * consult the Plan (via the {@link PlanView} interface) and try to determine
+ * whether there are sufficient resources available in this Plan to satisfy the
+ * temporal and resource constraints of a {@link ReservationDefinition}. If a
+ * valid allocation is found the agent will try to store it in the plan (via the
+ * {@link PlanEdit} interface). Upon success the system return to the user a
+ * positive acknowledgment, and a reservation identifier to be later used to
+ * access the reserved resources.
+ *
+ * A {@link PlanFollower} will continuously read from the Plan and will
+ * affect the instantaneous allocation of resources among jobs running by
+ * publishing the "current" slice of the Plan to the underlying scheduler. I.e.,
+ * the configuration of queues/weights of the scheduler are modified to reflect
+ * the allocations in the Plan.
+ *
+ * As this interface have several methods we decompose them into three groups:
+ * {@link PlanContext}: containing configuration type information,
+ * {@link PlanView} read-only access to the plan state, and {@link PlanEdit}
+ * write access to the plan state.
+ */
+public interface Plan extends PlanContext, PlanView, PlanEdit {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
new file mode 100644
index 0000000..40a25a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
@@ -0,0 +1,101 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+/**
+ * This interface provides read-only access to configuration-type parameter for
+ * a plan.
+ *
+ */
+public interface PlanContext {
+
+ /**
+ * Returns the configured "step" or granularity of time of the plan in millis.
+ *
+ * @return plan step in millis
+ */
+ public long getStep();
+
+ /**
+ * Return the {@link ReservationAgent} configured for this plan that is
+ * responsible for optimally placing various reservation requests
+ *
+ * @return the {@link ReservationAgent} configured for this plan
+ */
+ public ReservationAgent getReservationAgent();
+
+ /**
+ * Return an instance of a {@link Planner}, which will be invoked in response
+ * to unexpected reduction in the resources of this plan
+ *
+ * @return an instance of a {@link Planner}, which will be invoked in response
+ * to unexpected reduction in the resources of this plan
+ */
+ public Planner getReplanner();
+
+ /**
+ * Return the configured {@link SharingPolicy} that governs the sharing of the
+ * resources of the plan between its various users
+ *
+ * @return the configured {@link SharingPolicy} that governs the sharing of
+ * the resources of the plan between its various users
+ */
+ public SharingPolicy getSharingPolicy();
+
+ /**
+ * Returns the system {@link ResourceCalculator}
+ *
+ * @return the system {@link ResourceCalculator}
+ */
+ public ResourceCalculator getResourceCalculator();
+
+ /**
+ * Returns the single smallest {@link Resource} allocation that can be
+ * reserved in this plan
+ *
+ * @return the single smallest {@link Resource} allocation that can be
+ * reserved in this plan
+ */
+ public Resource getMinimumAllocation();
+
+ /**
+ * Returns the single largest {@link Resource} allocation that can be reserved
+ * in this plan
+ *
+ * @return the single largest {@link Resource} allocation that can be reserved
+ * in this plan
+ */
+ public Resource getMaximumAllocation();
+
+ /**
+ * Return the name of the queue in the {@link ResourceScheduler} corresponding
+ * to this plan
+ *
+ * @return the name of the queue in the {@link ResourceScheduler}
+ * corresponding to this plan
+ */
+ public String getQueueName();
+
+ /**
+ * Return the {@link QueueMetrics} for the queue in the
+ * {@link ResourceScheduler} corresponding to this plan
+ *
+ * @return the {@link QueueMetrics} for the queue in the
+ * {@link ResourceScheduler} corresponding to this plan
+ */
+ public QueueMetrics getQueueMetrics();
+
+ /**
+ * Instructs the {@link PlanFollower} on what to do for applications
+ * which are still running when the reservation is expiring (move-to-default
+ * vs kill)
+ *
+ * @return true if remaining applications have to be killed, false if they
+ * have to migrated
+ */
+ public boolean getMoveOnExpiry();
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
new file mode 100644
index 0000000..648edba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * This interface groups the methods used to modify the state of a Plan.
+ */
+public interface PlanEdit extends PlanContext, PlanView {
+
+ /**
+ * Add a new {@link ReservationAllocation} to the plan
+ *
+ * @param reservation the {@link ReservationAllocation} to be added to the
+ * plan
+ * @return true if addition is successful, false otherwise
+ */
+ public boolean addReservation(ReservationAllocation reservation)
+ throws PlanningException;
+
+ /**
+ * Updates an existing {@link ReservationAllocation} in the plan. This is
+ * required for re-negotiation
+ *
+ * @param reservation the {@link ReservationAllocation} to be updated the plan
+ * @return true if update is successful, false otherwise
+ */
+ public boolean updateReservation(ReservationAllocation reservation)
+ throws PlanningException;
+
+ /**
+ * Delete an existing {@link ReservationAllocation} from the plan identified
+ * uniquely by its {@link ReservationId}. This will generally be used for
+ * garbage collection
+ *
+ * @param reservation the {@link ReservationAllocation} to be deleted from the
+ * plan identified uniquely by its {@link ReservationId}
+ * @return true if delete is successful, false otherwise
+ */
+ public boolean deleteReservation(ReservationId reservationID)
+ throws PlanningException;
+
+ /**
+ * Method invoked to garbage collect old reservations. It cleans up expired
+ * reservations that have fallen out of the sliding archival window
+ *
+ * @param tick the current time from which the archival window is computed
+ */
+ public void archiveCompletedReservations(long tick) throws PlanningException;
+
+ /**
+ * Sets the overall capacity in terms of {@link Resource} assigned to this
+ * plan
+ *
+ * @param capacity the overall capacity in terms of {@link Resource} assigned
+ * to this plan
+ */
+ public void setTotalCapacity(Resource capacity);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
new file mode 100644
index 0000000..6e58dde
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -0,0 +1,89 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This interface provides a read-only view on the allocations made in this
+ * plan. This methods are used for example by {@link ReservationAgent}s to
+ * determine the free resources in a certain point in time, and by
+ * PlanFollowerPolicy to publish this plan to the scheduler.
+ */
+public interface PlanView extends PlanContext {
+
+ /**
+ * Return a {@link ReservationAllocation} identified by its
+ * {@link ReservationId}
+ *
+ * @param reservationID the unique id to identify the
+ * {@link ReservationAllocation}
+ * @return {@link ReservationAllocation} identified by the specified id
+ */
+ public ReservationAllocation getReservationById(ReservationId reservationID);
+
+ /**
+ * Gets all the active reservations at the specified point of time
+ *
+ * @param tick the time (UTC in ms) for which the active reservations are
+ * requested
+ * @return set of active reservations at the specified time
+ */
+ public Set<ReservationAllocation> getReservationsAtTime(long tick);
+
+ /**
+ * Gets all the reservations in the plan
+ *
+ * @return set of all reservations handled by this Plan
+ */
+ public Set<ReservationAllocation> getAllReservations();
+
+ /**
+ * Returns the total {@link Resource} reserved for all users at the specified
+ * time
+ *
+ * @param tick the time (UTC in ms) for which the reserved resources are
+ * requested
+ * @return the total {@link Resource} reserved for all users at the specified
+ * time
+ */
+ public Resource getTotalCommittedResources(long tick);
+
+ /**
+ * Returns the total {@link Resource} reserved for a given user at the
+ * specified time
+ *
+ * @param user the user who made the reservation(s)
+ * @param tick the time (UTC in ms) for which the reserved resources are
+ * requested
+ * @return the total {@link Resource} reserved for a given user at the
+ * specified time
+ */
+ public Resource getConsumptionForUser(String user, long tick);
+
+ /**
+ * Returns the overall capacity in terms of {@link Resource} assigned to this
+ * plan (typically will correspond to the absolute capacity of the
+ * corresponding queue).
+ *
+ * @return the overall capacity in terms of {@link Resource} assigned to this
+ * plan
+ */
+ public Resource getTotalCapacity();
+
+ /**
+ * Gets the time (UTC in ms) at which the first reservation starts
+ *
+ * @return the time (UTC in ms) at which the first reservation starts
+ */
+ public long getEarliestStartTime();
+
+ /**
+ * Returns the time (UTC in ms) at which the last reservation terminates
+ *
+ * @return the time (UTC in ms) at which the last reservation terminates
+ */
+ public long getLastEndTime();
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
new file mode 100644
index 0000000..fa8db30
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -0,0 +1,332 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * This is a run length encoded sparse data structure that maintains resource
+ * allocations over time
+ */
+public class RLESparseResourceAllocation {
+
+ private static final int THRESHOLD = 100;
+ private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+ private TreeMap<Long, Resource> cumulativeCapacity =
+ new TreeMap<Long, Resource>();
+
+ private final ReentrantReadWriteLock readWriteLock =
+ new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ private final ResourceCalculator resourceCalculator;
+ private final Resource minAlloc;
+
+ public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
+ Resource minAlloc) {
+ this.resourceCalculator = resourceCalculator;
+ this.minAlloc = minAlloc;
+ }
+
+ private boolean isSameAsPrevious(Long key, Resource capacity) {
+ Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
+ return (previous != null && previous.getValue().equals(capacity));
+ }
+
+ private boolean isSameAsNext(Long key, Resource capacity) {
+ Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
+ return (next != null && next.getValue().equals(capacity));
+ }
+
+ /**
+ * Add a resource for the specified interval
+ *
+ * @param reservationInterval the interval for which the resource is to be
+ * added
+ * @param capacity the resource to be added
+ * @return true if addition is successful, false otherwise
+ */
+ public boolean addInterval(ReservationInterval reservationInterval,
+ ReservationRequest capacity) {
+ Resource totCap =
+ Resources.multiply(capacity.getCapability(),
+ (float) capacity.getNumContainers());
+ if (totCap.equals(ZERO_RESOURCE)) {
+ return true;
+ }
+ writeLock.lock();
+ try {
+ long startKey = reservationInterval.getStartTime();
+ long endKey = reservationInterval.getEndTime();
+ NavigableMap<Long, Resource> ticks =
+ cumulativeCapacity.headMap(endKey, false);
+ if (ticks != null && !ticks.isEmpty()) {
+ Resource updatedCapacity = Resource.newInstance(0, 0);
+ Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
+ if (lowEntry == null) {
+ // This is the earliest starting interval
+ cumulativeCapacity.put(startKey, totCap);
+ } else {
+ updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
+ // Add a new tick only if the updated value is different
+ // from the previous tick
+ if ((startKey == lowEntry.getKey())
+ && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
+ cumulativeCapacity.remove(lowEntry.getKey());
+ } else {
+ cumulativeCapacity.put(startKey, updatedCapacity);
+ }
+ }
+ // Increase all the capacities of overlapping intervals
+ Set<Entry<Long, Resource>> overlapSet =
+ ticks.tailMap(startKey, false).entrySet();
+ for (Entry<Long, Resource> entry : overlapSet) {
+ updatedCapacity = Resources.add(entry.getValue(), totCap);
+ entry.setValue(updatedCapacity);
+ }
+ } else {
+ // This is the first interval to be added
+ cumulativeCapacity.put(startKey, totCap);
+ }
+ Resource nextTick = cumulativeCapacity.get(endKey);
+ if (nextTick != null) {
+ // If there is overlap, remove the duplicate entry
+ if (isSameAsPrevious(endKey, nextTick)) {
+ cumulativeCapacity.remove(endKey);
+ }
+ } else {
+ // Decrease capacity as this is end of the interval
+ cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
+ .floorEntry(endKey).getValue(), totCap));
+ }
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Add multiple resources for the specified interval
+ *
+ * @param reservationInterval the interval for which the resource is to be
+ * added
+ * @param ReservationRequests the resources to be added
+ * @param clusterResource the total resources in the cluster
+ * @return true if addition is successful, false otherwise
+ */
+ public boolean addCompositeInterval(ReservationInterval reservationInterval,
+ List<ReservationRequest> ReservationRequests, Resource clusterResource) {
+ ReservationRequest aggregateReservationRequest =
+ Records.newRecord(ReservationRequest.class);
+ Resource capacity = Resource.newInstance(0, 0);
+ for (ReservationRequest ReservationRequest : ReservationRequests) {
+ Resources.addTo(capacity, Resources.multiply(
+ ReservationRequest.getCapability(),
+ ReservationRequest.getNumContainers()));
+ }
+ aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
+ .divide(resourceCalculator, clusterResource, capacity, minAlloc)));
+ aggregateReservationRequest.setCapability(minAlloc);
+
+ return addInterval(reservationInterval, aggregateReservationRequest);
+ }
+
+ /**
+ * Removes a resource for the specified interval
+ *
+ * @param reservationInterval the interval for which the resource is to be
+ * removed
+ * @param capacity the resource to be removed
+ * @return true if removal is successful, false otherwise
+ */
+ public boolean removeInterval(ReservationInterval reservationInterval,
+ ReservationRequest capacity) {
+ Resource totCap =
+ Resources.multiply(capacity.getCapability(),
+ (float) capacity.getNumContainers());
+ if (totCap.equals(ZERO_RESOURCE)) {
+ return true;
+ }
+ writeLock.lock();
+ try {
+ long startKey = reservationInterval.getStartTime();
+ long endKey = reservationInterval.getEndTime();
+ // update the start key
+ NavigableMap<Long, Resource> ticks =
+ cumulativeCapacity.headMap(endKey, false);
+ // Decrease all the capacities of overlapping intervals
+ SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
+ if (overlapSet != null && !overlapSet.isEmpty()) {
+ Resource updatedCapacity = Resource.newInstance(0, 0);
+ long currentKey = -1;
+ for (Iterator<Entry<Long, Resource>> overlapEntries =
+ overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
+ Entry<Long, Resource> entry = overlapEntries.next();
+ currentKey = entry.getKey();
+ updatedCapacity = Resources.subtract(entry.getValue(), totCap);
+ // update each entry between start and end key
+ cumulativeCapacity.put(currentKey, updatedCapacity);
+ }
+ // Remove the first overlap entry if it is same as previous after
+ // updation
+ Long firstKey = overlapSet.firstKey();
+ if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
+ cumulativeCapacity.remove(firstKey);
+ }
+ // Remove the next entry if it is same as end entry after updation
+ if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
+ cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
+ }
+ }
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the capacity, i.e. total resources allocated at the specified point
+ * of time
+ *
+ * @param tick the time (UTC in ms) at which the capacity is requested
+ * @return the resources allocated at the specified time
+ */
+ public Resource getCapacityAtTime(long tick) {
+ readLock.lock();
+ try {
+ Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick);
+ if (closestStep != null) {
+ return Resources.clone(closestStep.getValue());
+ }
+ return Resources.clone(ZERO_RESOURCE);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get the timestamp of the earliest resource allocation
+ *
+ * @return the timestamp of the first resource allocation
+ */
+ public long getEarliestStartTime() {
+ readLock.lock();
+ try {
+ if (cumulativeCapacity.isEmpty()) {
+ return -1;
+ } else {
+ return cumulativeCapacity.firstKey();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get the timestamp of the latest resource allocation
+ *
+ * @return the timestamp of the last resource allocation
+ */
+ public long getLatestEndTime() {
+ readLock.lock();
+ try {
+ if (cumulativeCapacity.isEmpty()) {
+ return -1;
+ } else {
+ return cumulativeCapacity.lastKey();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if there are no non-zero entries
+ *
+ * @return true if there are no allocations or false otherwise
+ */
+ public boolean isEmpty() {
+ readLock.lock();
+ try {
+ if (cumulativeCapacity.isEmpty()) {
+ return true;
+ }
+ // Deletion leaves a single zero entry so check for that
+ if (cumulativeCapacity.size() == 1) {
+ return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
+ }
+ return false;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ret = new StringBuilder();
+ readLock.lock();
+ try {
+ if (cumulativeCapacity.size() > THRESHOLD) {
+ ret.append("Number of steps: ").append(cumulativeCapacity.size())
+ .append(" earliest entry: ").append(cumulativeCapacity.firstKey())
+ .append(" latest entry: ").append(cumulativeCapacity.lastKey());
+ } else {
+ for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+ ret.append(r.getKey()).append(": ").append(r.getValue())
+ .append("\n ");
+ }
+ }
+ return ret.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the JSON string representation of the current resources allocated
+ * over time
+ *
+ * @return the JSON string representation of the current resources allocated
+ * over time
+ */
+ public String toMemJSONString() {
+ StringWriter json = new StringWriter();
+ JsonWriter jsonWriter = new JsonWriter(json);
+ readLock.lock();
+ try {
+ jsonWriter.beginObject();
+ // jsonWriter.name("timestamp").value("resource");
+ for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
+ jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
+ }
+ jsonWriter.endObject();
+ jsonWriter.close();
+ return json.toString();
+ } catch (IOException e) {
+ // This should not happen
+ return "";
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
new file mode 100644
index 0000000..bca3aa8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java
@@ -0,0 +1,104 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * A ReservationAllocation represents a concrete allocation of resources over
+ * time that satisfy a certain {@link ReservationDefinition}. This is used
+ * internally by a {@link Plan} to store information about how each of the
+ * accepted {@link ReservationDefinition} have been allocated.
+ */
+public interface ReservationAllocation extends
+ Comparable<ReservationAllocation> {
+
+ /**
+ * Returns the unique identifier {@link ReservationId} that represents the
+ * reservation
+ *
+ * @return reservationId the unique identifier {@link ReservationId} that
+ * represents the reservation
+ */
+ public ReservationId getReservationId();
+
+ /**
+ * Returns the original {@link ReservationDefinition} submitted by the client
+ *
+ * @return
+ */
+ public ReservationDefinition getReservationDefinition();
+
+ /**
+ * Returns the time at which the reservation is activated
+ *
+ * @return the time at which the reservation is activated
+ */
+ public long getStartTime();
+
+ /**
+ * Returns the time at which the reservation terminates
+ *
+ * @return the time at which the reservation terminates
+ */
+ public long getEndTime();
+
+ /**
+ * Returns the map of resources requested against the time interval for which
+ * they were
+ *
+ * @return the allocationRequests the map of resources requested against the
+ * time interval for which they were
+ */
+ public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
+
+ /**
+ * Return a string identifying the plan to which the reservation belongs
+ *
+ * @return the plan to which the reservation belongs
+ */
+ public String getPlanName();
+
+ /**
+ * Returns the user who requested the reservation
+ *
+ * @return the user who requested the reservation
+ */
+ public String getUser();
+
+ /**
+ * Returns whether the reservation has gang semantics or not
+ *
+ * @return true if there is a gang request, false otherwise
+ */
+ public boolean containsGangs();
+
+ /**
+ * Sets the time at which the reservation was accepted by the system
+ *
+ * @param acceptedAt the time at which the reservation was accepted by the
+ * system
+ */
+ public void setAcceptanceTimestamp(long acceptedAt);
+
+ /**
+ * Returns the time at which the reservation was accepted by the system
+ *
+ * @return the time at which the reservation was accepted by the system
+ */
+ public long getAcceptanceTime();
+
+ /**
+ * Returns the capacity represented by cumulative resources reserved by the
+ * reservation at the specified point of time
+ *
+ * @param tick the time (UTC in ms) for which the reserved resources are
+ * requested
+ * @return the resources reserved at the specified time
+ */
+ public Resource getResourcesAtTime(long tick);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
new file mode 100644
index 0000000..d3a6d51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java
@@ -0,0 +1,67 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+/**
+ * This represents the time duration of the reservation
+ *
+ */
+public class ReservationInterval implements Comparable<ReservationInterval> {
+
+ private final long startTime;
+
+ private final long endTime;
+
+ public ReservationInterval(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ /**
+ * Get the start time of the reservation interval
+ *
+ * @return the startTime
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Get the end time of the reservation interval
+ *
+ * @return the endTime
+ */
+ public long getEndTime() {
+ return endTime;
+ }
+
+ /**
+ * Returns whether the interval is active at the specified instant of time
+ *
+ * @param tick the instance of the time to check
+ * @return true if active, false otherwise
+ */
+ public boolean isOverlap(long tick) {
+ return (startTime <= tick && tick <= endTime);
+ }
+
+ @Override
+ public int compareTo(ReservationInterval anotherInterval) {
+ long diff = 0;
+ if (startTime == anotherInterval.getStartTime()) {
+ diff = endTime - anotherInterval.getEndTime();
+ } else {
+ diff = startTime - anotherInterval.getStartTime();
+ }
+ if (diff < 0) {
+ return -1;
+ } else if (diff > 0) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ public String toString() {
+ return "[" + startTime + ", " + endTime + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
new file mode 100644
index 0000000..aa9e9fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java
@@ -0,0 +1,25 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+
+/**
+ * Exception thrown by the admission control subsystem when there is a problem
+ * in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
+ */
+public class PlanningException extends Exception {
+
+ private static final long serialVersionUID = -684069387367879218L;
+
+ public PlanningException(String message) {
+ super(message);
+ }
+
+ public PlanningException(Throwable cause) {
+ super(cause);
+ }
+
+ public PlanningException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/056b7f57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
new file mode 100644
index 0000000..cbca6dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -0,0 +1,210 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+public class ReservationSystemTestUtil {
+
+ private static Random rand = new Random();
+
+ public final static String reservationQ = "dedicated";
+
+ public static ReservationId getNewReservationId() {
+ return ReservationId.newInstance(rand.nextLong(), rand.nextLong());
+ }
+
+ public CapacityScheduler mockCapacityScheduler(int numContainers)
+ throws IOException {
+ // stolen from TestCapacityScheduler
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+
+ CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
+ cs.setConf(new YarnConfiguration());
+ RMContext mockRmContext =
+ Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+ cs.setRMContext(mockRmContext);
+ try {
+ cs.serviceInit(conf);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ when(mockRmContext.getScheduler()).thenReturn(cs);
+ Resource r = Resource.newInstance(numContainers * 1024, numContainers);
+ doReturn(r).when(cs).getClusterResource();
+ return cs;
+ }
+
+ public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+ // Define default queue
+ final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
+ conf.setCapacity(defQ, 10);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
+ "default", "a", reservationQ });
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+
+ final String dedicated =
+ CapacitySchedulerConfiguration.ROOT
+ + CapacitySchedulerConfiguration.DOT + reservationQ;
+ conf.setCapacity(dedicated, 80);
+ // Set as reservation queue
+ conf.setReservableQueue(dedicated, true);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] { "a1", "a2" });
+ conf.setCapacity(A1, 30);
+ conf.setCapacity(A2, 70);
+ }
+
+ public String getFullReservationQueueName() {
+ return CapacitySchedulerConfiguration.ROOT
+ + CapacitySchedulerConfiguration.DOT + reservationQ;
+ }
+
+ public String getreservationQueueName() {
+ return reservationQ;
+ }
+
+ public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
+ String newQ) {
+ // Define default queue
+ final String prefix =
+ CapacitySchedulerConfiguration.ROOT
+ + CapacitySchedulerConfiguration.DOT;
+ final String defQ = prefix + "default";
+ conf.setCapacity(defQ, 5);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
+ "default", "a", reservationQ, newQ });
+
+ final String A = prefix + "a";
+ conf.setCapacity(A, 5);
+
+ final String dedicated = prefix + reservationQ;
+ conf.setCapacity(dedicated, 80);
+ // Set as reservation queue
+ conf.setReservableQueue(dedicated, true);
+
+ conf.setCapacity(prefix + newQ, 10);
+ // Set as reservation queue
+ conf.setReservableQueue(prefix + newQ, true);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] { "a1", "a2" });
+ conf.setCapacity(A1, 30);
+ conf.setCapacity(A2, 70);
+ }
+
+ public static ReservationDefinition generateRandomRR(Random rand, long i) {
+ rand.setSeed(i);
+ long now = System.currentTimeMillis();
+
+ // start time at random in the next 12 hours
+ long arrival = rand.nextInt(12 * 3600 * 1000);
+ // deadline at random in the next day
+ long deadline = arrival + rand.nextInt(24 * 3600 * 1000);
+
+ // create a request with a single atomic ask
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(now + arrival);
+ rr.setDeadline(now + deadline);
+
+ int gang = 1 + rand.nextInt(9);
+ int par = (rand.nextInt(1000) + 1) * gang;
+ long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
+ gang, dur);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ rand.nextInt(3);
+ ReservationRequestInterpreter[] type =
+ ReservationRequestInterpreter.values();
+ reqs.setInterpreter(type[rand.nextInt(type.length)]);
+ rr.setReservationRequests(reqs);
+
+ return rr;
+
+ }
+
+ public static ReservationDefinition generateBigRR(Random rand, long i) {
+ rand.setSeed(i);
+ long now = System.currentTimeMillis();
+
+ // start time at random in the next 2 hours
+ long arrival = rand.nextInt(2 * 3600 * 1000);
+ // deadline at random in the next day
+ long deadline = rand.nextInt(24 * 3600 * 1000);
+
+ // create a request with a single atomic ask
+ ReservationDefinition rr = new ReservationDefinitionPBImpl();
+ rr.setArrival(now + arrival);
+ rr.setDeadline(now + deadline);
+
+ int gang = 1;
+ int par = 100000; // 100k tasks
+ long dur = rand.nextInt(60 * 1000); // 1min tasks
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
+ gang, dur);
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ rand.nextInt(3);
+ ReservationRequestInterpreter[] type =
+ ReservationRequestInterpreter.values();
+ reqs.setInterpreter(type[rand.nextInt(type.length)]);
+ rr.setReservationRequests(reqs);
+
+ return rr;
+ }
+
+ public static Map<ReservationInterval, ReservationRequest> generateAllocation(
+ long startTime, long step, int[] alloc) {
+ Map<ReservationInterval, ReservationRequest> req =
+ new TreeMap<ReservationInterval, ReservationRequest>();
+ for (int i = 0; i < alloc.length; i++) {
+ req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
+ * step), ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), alloc[i]));
+ }
+ return req;
+ }
+
+}