You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2017/09/03 23:55:56 UTC
[76/77] [abbrv] hadoop git commit: Plan/ResourceAllocation data
structure enhancements required to support recurring reservations in
ReservationSystem.
Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7996eca7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7996eca7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7996eca7
Branch: refs/heads/yarn-3409
Commit: 7996eca7dcfaa1bdf970e32022274f2699bef8a1
Parents: c5281a8
Author: Subru Krishnan <su...@apache.org>
Authored: Fri Sep 1 15:16:40 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Sep 1 15:16:40 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 6 +
.../yarn/conf/TestYarnConfigurationFields.java | 4 +-
.../reservation/AbstractReservationSystem.java | 90 ++--
.../AbstractSchedulerPlanFollower.java | 183 ++++----
.../reservation/InMemoryPlan.java | 400 ++++++++++++-----
.../InMemoryReservationAllocation.java | 36 +-
.../reservation/NoOverCommitPolicy.java | 2 +-
.../PeriodicRLESparseResourceAllocation.java | 130 ++++--
.../resourcemanager/reservation/PlanEdit.java | 24 +-
.../resourcemanager/reservation/PlanView.java | 94 ++--
.../RLESparseResourceAllocation.java | 115 ++---
.../reservation/ReservationAllocation.java | 60 ++-
.../reservation/ReservationInputValidator.java | 134 +++---
.../reservation/ReservationSystem.java | 6 +-
.../reservation/SharingPolicy.java | 13 +-
.../reservation/planning/Planner.java | 2 +-
.../reservation/planning/PlanningAlgorithm.java | 2 +-
.../reservation/planning/StageAllocator.java | 2 +-
.../planning/StageAllocatorGreedy.java | 2 +-
.../planning/StageAllocatorGreedyRLE.java | 5 +-
.../planning/StageAllocatorLowCostAligned.java | 4 +-
.../reservation/ReservationSystemTestUtil.java | 135 +++---
.../reservation/TestInMemoryPlan.java | 431 ++++++++++++-------
...TestPeriodicRLESparseResourceAllocation.java | 109 +++--
.../TestRLESparseResourceAllocation.java | 122 +++---
.../planning/TestSimpleCapacityReplanner.java | 8 +-
26 files changed, 1342 insertions(+), 777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4944821..27ca957 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -262,6 +262,12 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
1000L;
+ /** The maximum periodicity for the Reservation System. */
+ public static final String RM_RESERVATION_SYSTEM_MAX_PERIODICITY =
+ RM_PREFIX + "reservation-system.max-periodicity";
+ public static final long DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY =
+ 86400000L;
+
/**
* Enable periodic monitor threads.
* @see #RM_SCHEDULER_MONITOR_POLICIES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index bd7bf93..1d3111c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.TestConfigurationFieldsBase;
*/
public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "methodlength"})
@Override
public void initializeMemberVariables() {
xmlFilename = new String("yarn-default.xml");
@@ -69,6 +69,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare.add(YarnConfiguration
.YARN_SECURITY_SERVICE_AUTHORIZATION_COLLECTOR_NODEMANAGER_PROTOCOL);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
// Federation default configs to be ignored
configurationPropsToSkipCompare
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 5ef4912..5b8772c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -18,6 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -46,17 +57,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
* This is the implementation of {@link ReservationSystem} based on the
* {@link ResourceScheduler}
@@ -66,8 +66,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractReservationSystem extends AbstractService
implements ReservationSystem {
- private static final Logger LOG = LoggerFactory
- .getLogger(AbstractReservationSystem.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractReservationSystem.class);
// private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
@@ -103,6 +103,8 @@ public abstract class AbstractReservationSystem extends AbstractService
private boolean isRecoveryEnabled = false;
+ private long maxPeriodicity;
+
/**
* Construct the service.
*
@@ -143,36 +145,41 @@ public abstract class AbstractReservationSystem extends AbstractService
this.conf = conf;
scheduler = rmContext.getScheduler();
// Get the plan step size
- planStepSize =
- conf.getTimeDuration(
- YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
- YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
- TimeUnit.MILLISECONDS);
+ planStepSize = conf.getTimeDuration(
+ YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+ TimeUnit.MILLISECONDS);
if (planStepSize < 0) {
planStepSize =
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
}
+ maxPeriodicity =
+ conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
+ if (maxPeriodicity <= 0) {
+ maxPeriodicity =
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY;
+ }
// Create a plan corresponding to every reservable queue
Set<String> planQueueNames = scheduler.getPlanQueues();
for (String planQueueName : planQueueNames) {
Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan);
}
- isRecoveryEnabled = conf.getBoolean(
- YarnConfiguration.RECOVERY_ENABLED,
+ isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
- YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) &&
- conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
- YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
+ YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE)
+ && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
+ YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
reservationsACLsManager = new ReservationsACLsManager(scheduler, conf);
}
}
private void loadPlan(String planName,
Map<ReservationId, ReservationAllocationStateProto> reservations)
- throws PlanningException {
+ throws PlanningException {
Plan plan = plans.get(planName);
Resource minAllocation = getMinAllocation();
ResourceCalculator rescCalculator = getResourceCalculator();
@@ -248,8 +255,8 @@ public abstract class AbstractReservationSystem extends AbstractService
Class<?> planFollowerPolicyClazz =
conf.getClassByName(planFollowerPolicyClassName);
if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
- return (PlanFollower) ReflectionUtils.newInstance(
- planFollowerPolicyClazz, conf);
+ return (PlanFollower) ReflectionUtils
+ .newInstance(planFollowerPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+ " not instance of " + PlanFollower.class.getCanonicalName());
@@ -257,7 +264,8 @@ public abstract class AbstractReservationSystem extends AbstractService
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate PlanFollowerPolicy: "
- + planFollowerPolicyClassName, e);
+ + planFollowerPolicyClassName,
+ e);
}
}
@@ -371,9 +379,8 @@ public abstract class AbstractReservationSystem extends AbstractService
public ReservationId getNewReservationId() {
writeLock.lock();
try {
- ReservationId resId =
- ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
- resCounter.incrementAndGet());
+ ReservationId resId = ReservationId.newInstance(
+ ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet());
LOG.info("Allocated new reservationId: " + resId);
return resId;
} finally {
@@ -390,8 +397,11 @@ public abstract class AbstractReservationSystem extends AbstractService
* Get the default reservation system corresponding to the scheduler
*
* @param scheduler the scheduler for which the reservation system is required
+ *
+ * @return the {@link ReservationSystem} based on the configured scheduler
*/
- public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
+ public static String getDefaultReservationSystem(
+ ResourceScheduler scheduler) {
if (scheduler instanceof CapacityScheduler) {
return CapacityReservationSystem.class.getName();
} else if (scheduler instanceof FairScheduler) {
@@ -409,12 +419,11 @@ public abstract class AbstractReservationSystem extends AbstractService
Resource maxAllocation = getMaxAllocation();
ResourceCalculator rescCalc = getResourceCalculator();
Resource totCap = getPlanQueueCapacity(planQueueName);
- Plan plan =
- new InMemoryPlan(getRootQueueMetrics(), adPolicy,
- getAgent(planQueuePath), totCap, planStepSize, rescCalc,
- minAllocation, maxAllocation, planQueueName,
- getReplanner(planQueuePath), getReservationSchedulerConfiguration()
- .getMoveOnExpiry(planQueuePath), rmContext);
+ Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
+ getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
+ maxAllocation, planQueueName, getReplanner(planQueuePath),
+ getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath),
+ maxPeriodicity, rmContext);
LOG.info("Initialized plan {} based on reservable queue {}",
plan.toString(), planQueueName);
return plan;
@@ -477,8 +486,8 @@ public abstract class AbstractReservationSystem extends AbstractService
Class<?> admissionPolicyClazz =
conf.getClassByName(admissionPolicyClassName);
if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
- return (SharingPolicy) ReflectionUtils.newInstance(
- admissionPolicyClazz, conf);
+ return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz,
+ conf);
} else {
throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+ " not instance of " + SharingPolicy.class.getCanonicalName());
@@ -493,8 +502,7 @@ public abstract class AbstractReservationSystem extends AbstractService
return this.reservationsACLsManager;
}
- protected abstract ReservationSchedulerConfiguration
- getReservationSchedulerConfiguration();
+ protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration();
protected abstract String getPlanQueuePath(String planQueueName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
index 90357e3..9b6a0b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
@@ -18,6 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,24 +41,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
- private static final Logger LOG = LoggerFactory
- .getLogger(AbstractSchedulerPlanFollower.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class);
protected Collection<Plan> plans = new ArrayList<Plan>();
protected YarnScheduler scheduler;
protected Clock clock;
@Override
- public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+ public void init(Clock clock, ResourceScheduler sched,
+ Collection<Plan> plans) {
this.clock = clock;
this.scheduler = sched;
this.plans.addAll(plans);
@@ -71,7 +72,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
@Override
public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
- String planQueueName = plan.getQueueName();
+ String planQueueName = plan.getQueueName();
if (LOG.isDebugEnabled()) {
LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
}
@@ -82,12 +83,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
now += step - (now % step);
}
Queue planQueue = getPlanQueue(planQueueName);
- if (planQueue == null) return;
+ if (planQueue == null) {
+ return;
+ }
// first we publish to the plan the current availability of resources
Resource clusterResources = scheduler.getClusterResource();
- Resource planResources = getPlanResources(plan, planQueue,
- clusterResources);
+ Resource planResources =
+ getPlanResources(plan, planQueue, clusterResources);
Set<ReservationAllocation> currentReservations =
plan.getReservationsAtTime(now);
Set<String> curReservationNames = new HashSet<String>();
@@ -95,12 +98,11 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
int numRes = getReservedResources(now, currentReservations,
curReservationNames, reservedResources);
// create the default reservation queue if it doesnt exist
- String defReservationId = getReservationIdFromQueueName(planQueueName) +
- ReservationConstants.DEFAULT_QUEUE_SUFFIX;
- String defReservationQueue = getReservationQueueName(planQueueName,
- defReservationId);
- createDefaultReservationQueue(planQueueName, planQueue,
- defReservationId);
+ String defReservationId = getReservationIdFromQueueName(planQueueName)
+ + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+ String defReservationQueue =
+ getReservationQueueName(planQueueName, defReservationId);
+ createDefaultReservationQueue(planQueueName, planQueue, defReservationId);
curReservationNames.add(defReservationId);
// if the resources dedicated to this plan has shrunk invoke replanner
boolean shouldResize = false;
@@ -149,10 +151,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
// sort allocations from the one giving up the most resources, to the
// one asking for the most avoid order-of-operation errors that
// temporarily violate 100% capacity bound
- List<ReservationAllocation> sortedAllocations =
- sortByDelta(
- new ArrayList<ReservationAllocation>(currentReservations), now,
- plan);
+ List<ReservationAllocation> sortedAllocations = sortByDelta(
+ new ArrayList<ReservationAllocation>(currentReservations), now, plan);
for (ReservationAllocation res : sortedAllocations) {
String currResId = res.getReservationId().toString();
if (curReservationNames.contains(currResId)) {
@@ -163,10 +163,9 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
if (planResources.getMemorySize() > 0
&& planResources.getVirtualCores() > 0) {
if (shouldResize) {
- capToAssign =
- calculateReservationToPlanProportion(
- plan.getResourceCalculator(), planResources,
- reservedResources, capToAssign);
+ capToAssign = calculateReservationToPlanProportion(
+ plan.getResourceCalculator(), planResources, reservedResources,
+ capToAssign);
}
targetCapacity =
calculateReservationToPlanRatio(plan.getResourceCalculator(),
@@ -185,7 +184,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
maxCapacity = targetCapacity;
}
try {
- setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity);
+ setQueueEntitlement(planQueueName, currResId, targetCapacity,
+ maxCapacity);
} catch (YarnException e) {
LOG.warn("Exception while trying to size reservation for plan: {}",
currResId, planQueueName, e);
@@ -196,9 +196,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
// compute the default queue capacity
float defQCap = 1.0f - totalAssignedCapacity;
if (LOG.isDebugEnabled()) {
- LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
- + "currReservation: {} default-queue capacity: {}", planResources,
- numRes, defQCap);
+ LOG.debug(
+ "PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+ + "currReservation: {} default-queue capacity: {}",
+ planResources, numRes, defQCap);
}
// set the default queue to eat-up all remaining capacity
try {
@@ -225,12 +226,11 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
protected void setQueueEntitlement(String planQueueName, String currResId,
- float targetCapacity,
- float maxCapacity) throws YarnException {
- String reservationQueueName = getReservationQueueName(planQueueName,
- currResId);
- scheduler.setEntitlement(reservationQueueName, new QueueEntitlement(
- targetCapacity, maxCapacity));
+ float targetCapacity, float maxCapacity) throws YarnException {
+ String reservationQueueName =
+ getReservationQueueName(planQueueName, currResId);
+ scheduler.setEntitlement(reservationQueueName,
+ new QueueEntitlement(targetCapacity, maxCapacity));
}
// Schedulers have different ways of naming queues. See YARN-2773
@@ -244,14 +244,21 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
* Then move all apps in the set of queues to the parent plan queue's default
* reservation queue if move is enabled. Finally cleanups the queue by killing
* any apps (if move is disabled or move failed) and removing the queue
+ *
+ * @param planQueueName the name of {@code PlanQueue}
+ * @param shouldMove flag to indicate if any running apps should be moved or
+ * killed
+ * @param toRemove the remnant apps to clean up
+ * @param defReservationQueue the default {@code ReservationQueue} of the
+ * {@link Plan}
*/
- protected void cleanupExpiredQueues(String planQueueName,
- boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
+ protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove,
+ Set<String> toRemove, String defReservationQueue) {
for (String expiredReservationId : toRemove) {
try {
// reduce entitlement to 0
- String expiredReservation = getReservationQueueName(planQueueName,
- expiredReservationId);
+ String expiredReservation =
+ getReservationQueueName(planQueueName, expiredReservationId);
setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
if (shouldMove) {
moveAppsInQueueSync(expiredReservation, defReservationQueue);
@@ -275,7 +282,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
* reservation queue in a synchronous fashion
*/
private void moveAppsInQueueSync(String expiredReservation,
- String defReservationQueue) {
+ String defReservationQueue) {
List<ApplicationAttemptId> activeApps =
scheduler.getAppsInQueue(expiredReservation);
if (activeApps.isEmpty()) {
@@ -287,16 +294,16 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
} catch (YarnException e) {
LOG.warn(
- "Encountered unexpected error during migration of application: {}" +
- " from reservation: {}",
+ "Encountered unexpected error during migration of application: {}"
+ + " from reservation: {}",
app, expiredReservation, e);
}
}
}
- protected int getReservedResources(long now, Set<ReservationAllocation>
- currentReservations, Set<String> curReservationNames,
- Resource reservedResources) {
+ protected int getReservedResources(long now,
+ Set<ReservationAllocation> currentReservations,
+ Set<String> curReservationNames, Resource reservedResources) {
int numRes = 0;
if (currentReservations != null) {
numRes = currentReservations.size();
@@ -312,23 +319,30 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
* Sort in the order from the least new amount of resources asked (likely
* negative) to the highest. This prevents "order-of-operation" errors related
* to exceeding 100% capacity temporarily.
+ *
+ * @param currentReservations the currently active reservations
+ * @param now the current time
+ * @param plan the {@link Plan} that is being considered
+ *
+ * @return the sorted list of {@link ReservationAllocation}s
*/
protected List<ReservationAllocation> sortByDelta(
List<ReservationAllocation> currentReservations, long now, Plan plan) {
- Collections.sort(currentReservations, new ReservationAllocationComparator(
- now, this, plan));
+ Collections.sort(currentReservations,
+ new ReservationAllocationComparator(now, this, plan));
return currentReservations;
}
/**
- * Get queue associated with reservable queue named
- * @param planQueueName Name of the reservable queue
+ * Get queue associated with reservable queue named.
+ *
+ * @param planQueueName name of the reservable queue
* @return queue associated with the reservable queue
*/
protected abstract Queue getPlanQueue(String planQueueName);
/**
- * Resizes reservations based on currently available resources
+ * Resizes reservations based on currently available resources.
*/
private Resource calculateReservationToPlanProportion(
ResourceCalculator rescCalculator, Resource availablePlanResources,
@@ -338,7 +352,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
/**
- * Calculates ratio of reservationResources to planResources
+ * Calculates ratio of reservationResources to planResources.
*/
private float calculateReservationToPlanRatio(
ResourceCalculator rescCalculator, Resource clusterResources,
@@ -348,7 +362,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
/**
- * Check if plan resources are less than expected reservation resources
+ * Check if plan resources are less than expected reservation resources.
*/
private boolean arePlanResourcesLessThanReservations(
ResourceCalculator rescCalculator, Resource clusterResources,
@@ -358,38 +372,56 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
/**
- * Get a list of reservation queues for this planQueue
+ * Get a list of reservation queues for this planQueue.
+ *
+ * @param planQueue the queue for the current {@link Plan}
+ *
+ * @return the queues corresponding to the reservations
*/
protected abstract List<? extends Queue> getChildReservationQueues(
Queue planQueue);
/**
- * Add a new reservation queue for reservation currResId for this planQueue
+ * Add a new reservation queue for reservation currResId for this planQueue.
*/
- protected abstract void addReservationQueue(
- String planQueueName, Queue queue, String currResId);
+ protected abstract void addReservationQueue(String planQueueName, Queue queue,
+ String currResId);
/**
- * Creates the default reservation queue for use when no reservation is
- * used for applications submitted to this planQueue
+ * Creates the default reservation queue for use when no reservation is used
+ * for applications submitted to this planQueue.
+ *
+ * @param planQueueName name of the reservable queue
+ * @param queue the queue for the current {@link Plan}
+ * @param defReservationQueue name of the default {@code ReservationQueue}
*/
- protected abstract void createDefaultReservationQueue(
- String planQueueName, Queue queue, String defReservationQueue);
+ protected abstract void createDefaultReservationQueue(String planQueueName,
+ Queue queue, String defReservationQueue);
/**
- * Get plan resources for this planQueue
+ * Get plan resources for this planQueue.
+ *
+ * @param plan the current {@link Plan} being considered
+ * @param clusterResources the resources available in the cluster
+ *
+ * @return the resources allocated to the specified {@link Plan}
*/
- protected abstract Resource getPlanResources(
- Plan plan, Queue queue, Resource clusterResources);
+ protected abstract Resource getPlanResources(Plan plan, Queue queue,
+ Resource clusterResources);
/**
* Get reservation queue resources if it exists otherwise return null.
+ *
+ * @param plan the current {@link Plan} being considered
+ * @param reservationId the identifier of the reservation
+ *
+ * @return the resources allocated to the specified reservation
*/
protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
ReservationId reservationId);
- private static class ReservationAllocationComparator implements
- Comparator<ReservationAllocation> {
+ private static class ReservationAllocationComparator
+ implements Comparator<ReservationAllocation> {
AbstractSchedulerPlanFollower planFollower;
long now;
Plan plan;
@@ -404,14 +436,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
private Resource getUnallocatedReservedResources(
ReservationAllocation reservation) {
Resource resResource;
- Resource reservationResource = planFollower
- .getReservationQueueResourceIfExists
- (plan, reservation.getReservationId());
+ Resource reservationResource =
+ planFollower.getReservationQueueResourceIfExists(plan,
+ reservation.getReservationId());
if (reservationResource != null) {
- resResource =
- Resources.subtract(
- reservation.getResourcesAtTime(now),
- reservationResource);
+ resResource = Resources.subtract(reservation.getResourcesAtTime(now),
+ reservationResource);
} else {
resResource = reservation.getResourcesAtTime(now);
}
@@ -428,4 +458,3 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
}
}
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 783fd09..9eb1820 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,9 +33,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -64,9 +65,14 @@ public class InMemoryPlan implements Plan {
private RLESparseResourceAllocation rleSparseVector;
+ private PeriodicRLESparseResourceAllocation periodicRle;
+
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
+ private Map<String, RLESparseResourceAllocation> userPeriodicResourceAlloc =
+ new HashMap<String, RLESparseResourceAllocation>();
+
private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
new HashMap<String, RLESparseResourceAllocation>();
@@ -96,15 +102,27 @@ public class InMemoryPlan implements Plan {
String queueName, Planner replanner, boolean getMoveOnExpiry,
RMContext rmContext) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
- maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext,
- new UTCClock());
+ maxAlloc, queueName, replanner, getMoveOnExpiry,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
+ rmContext);
}
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry,
- RMContext rmContext, Clock clock) {
+ long maxPeriodicty, RMContext rmContext) {
+ this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
+ maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicty,
+ rmContext, new UTCClock());
+ }
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+ ReservationAgent agent, Resource totalCapacity, long step,
+ ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
+ String queueName, Planner replanner, boolean getMoveOnExpiry,
+ long maxPeriodicty, RMContext rmContext, Clock clock) {
this.queueMetrics = queueMetrics;
this.policy = policy;
this.agent = agent;
@@ -114,6 +132,8 @@ public class InMemoryPlan implements Plan {
this.minAlloc = minAlloc;
this.maxAlloc = maxAlloc;
this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
+ this.periodicRle =
+ new PeriodicRLESparseResourceAllocation(resCalc, maxPeriodicty);
this.queueName = queueName;
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
@@ -126,6 +146,39 @@ public class InMemoryPlan implements Plan {
return queueMetrics;
}
+ private RLESparseResourceAllocation getUserRLEResourceAllocation(String user,
+ long period) {
+ RLESparseResourceAllocation resAlloc = null;
+ if (period > 0) {
+ if (userPeriodicResourceAlloc.containsKey(user)) {
+ resAlloc = userPeriodicResourceAlloc.get(user);
+ } else {
+ resAlloc = new PeriodicRLESparseResourceAllocation(resCalc,
+ periodicRle.getTimePeriod());
+ userPeriodicResourceAlloc.put(user, resAlloc);
+ }
+ } else {
+ if (userResourceAlloc.containsKey(user)) {
+ resAlloc = userResourceAlloc.get(user);
+ } else {
+ resAlloc = new RLESparseResourceAllocation(resCalc);
+ userResourceAlloc.put(user, resAlloc);
+ }
+ }
+ return resAlloc;
+ }
+
+ private void gcUserRLEResourceAllocation(String user, long period) {
+ if (period > 0) {
+ if (userPeriodicResourceAlloc.get(user).isEmpty()) {
+ userPeriodicResourceAlloc.remove(user);
+ }
+ } else {
+ if (userResourceAlloc.get(user).isEmpty()) {
+ userResourceAlloc.remove(user);
+ }
+ }
+ }
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
@@ -133,11 +186,10 @@ public class InMemoryPlan implements Plan {
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
- RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
- if (resAlloc == null) {
- resAlloc = new RLESparseResourceAllocation(resCalc);
- userResourceAlloc.put(user, resAlloc);
- }
+ long period = reservation.getPeriodicity();
+ RLESparseResourceAllocation resAlloc =
+ getUserRLEResourceAllocation(user, period);
+
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
if (resCount == null) {
resCount = new RLESparseResourceAllocation(resCalc);
@@ -149,14 +201,43 @@ public class InMemoryPlan implements Plan {
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
- resAlloc.addInterval(r.getKey(), r.getValue());
- rleSparseVector.addInterval(r.getKey(), r.getValue());
- if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
- ZERO_RESOURCE)) {
- earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
- latestActive = Math.max(latestActive, r.getKey().getEndTime());
+
+ if (period > 0L) {
+ for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
+
+ long rStart = r.getKey().getStartTime() + i * period;
+ long rEnd = r.getKey().getEndTime() + i * period;
+
+ // handle wrap-around
+ if (rEnd > periodicRle.getTimePeriod()) {
+ long diff = rEnd - periodicRle.getTimePeriod();
+ rEnd = periodicRle.getTimePeriod();
+ ReservationInterval newInterval = new ReservationInterval(0, diff);
+ periodicRle.addInterval(newInterval, r.getValue());
+ resAlloc.addInterval(newInterval, r.getValue());
+ }
+
+ ReservationInterval newInterval =
+ new ReservationInterval(rStart, rEnd);
+ periodicRle.addInterval(newInterval, r.getValue());
+ resAlloc.addInterval(newInterval, r.getValue());
+ }
+
+ } else {
+ rleSparseVector.addInterval(r.getKey(), r.getValue());
+ resAlloc.addInterval(r.getKey(), r.getValue());
+ if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+ ZERO_RESOURCE)) {
+ earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+ latestActive = Math.max(latestActive, r.getKey().getEndTime());
+ }
}
}
+ // periodic reservations are active from start time and good till cancelled
+ if (period > 0L) {
+ earliestActive = reservation.getStartTime();
+ latestActive = Long.MAX_VALUE;
+ }
resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
Resource.newInstance(1, 1));
}
@@ -166,27 +247,55 @@ public class InMemoryPlan implements Plan {
Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
- RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
+ long period = reservation.getPeriodicity();
+ RLESparseResourceAllocation resAlloc =
+ getUserRLEResourceAllocation(user, period);
long earliestActive = Long.MAX_VALUE;
long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
- resAlloc.removeInterval(r.getKey(), r.getValue());
- rleSparseVector.removeInterval(r.getKey(), r.getValue());
- if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
- ZERO_RESOURCE)) {
- earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
- latestActive = Math.max(latestActive, r.getKey().getEndTime());
+ if (period > 0L) {
+ for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
+
+ long rStart = r.getKey().getStartTime() + i * period;
+ long rEnd = r.getKey().getEndTime() + i * period;
+
+ // handle wrap-around
+ if (rEnd > periodicRle.getTimePeriod()) {
+ long diff = rEnd - periodicRle.getTimePeriod();
+ rEnd = periodicRle.getTimePeriod();
+ ReservationInterval newInterval = new ReservationInterval(0, diff);
+ periodicRle.removeInterval(newInterval, r.getValue());
+ resAlloc.removeInterval(newInterval, r.getValue());
+ }
+
+ ReservationInterval newInterval =
+ new ReservationInterval(rStart, rEnd);
+ periodicRle.removeInterval(newInterval, r.getValue());
+ resAlloc.removeInterval(newInterval, r.getValue());
+ }
+ } else {
+ rleSparseVector.removeInterval(r.getKey(), r.getValue());
+ resAlloc.removeInterval(r.getKey(), r.getValue());
+ if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
+ ZERO_RESOURCE)) {
+ earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
+ latestActive = Math.max(latestActive, r.getKey().getEndTime());
+ }
}
}
- if (resAlloc.isEmpty()) {
- userResourceAlloc.remove(user);
- }
+ gcUserRLEResourceAllocation(user, period);
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
- resCount.removeInterval(new ReservationInterval(earliestActive,
- latestActive), Resource.newInstance(1, 1));
+ // periodic reservations are active from start time and good till cancelled
+ if (period > 0L) {
+ earliestActive = reservation.getStartTime();
+ latestActive = Long.MAX_VALUE;
+ }
+ resCount.removeInterval(
+ new ReservationInterval(earliestActive, latestActive),
+ Resource.newInstance(1, 1));
if (resCount.isEmpty()) {
userActiveReservationCount.remove(user);
}
@@ -198,9 +307,9 @@ public class InMemoryPlan implements Plan {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
new TreeSet<ReservationAllocation>();
- for (Set<InMemoryReservationAllocation> reservationEntries :
- currentReservations.values()) {
- flattenedReservations.addAll(reservationEntries);
+ for (Set<InMemoryReservationAllocation> res : currentReservations
+ .values()) {
+ flattenedReservations.addAll(res);
}
return flattenedReservations;
} else {
@@ -218,19 +327,16 @@ public class InMemoryPlan implements Plan {
InMemoryReservationAllocation inMemReservation =
(InMemoryReservationAllocation) reservation;
if (inMemReservation.getUser() == null) {
- String errMsg =
- "The specified Reservation with ID "
- + inMemReservation.getReservationId()
- + " is not mapped to any user";
+ String errMsg = "The specified Reservation with ID "
+ + inMemReservation.getReservationId() + " is not mapped to any user";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
writeLock.lock();
try {
if (reservationTable.containsKey(inMemReservation.getReservationId())) {
- String errMsg =
- "The specified Reservation with ID "
- + inMemReservation.getReservationId() + " already exists";
+ String errMsg = "The specified Reservation with ID "
+ + inMemReservation.getReservationId() + " already exists";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
@@ -246,9 +352,8 @@ public class InMemoryPlan implements Plan {
getQueueName(), inMemReservation.getReservationId().toString());
}
}
- ReservationInterval searchInterval =
- new ReservationInterval(inMemReservation.getStartTime(),
- inMemReservation.getEndTime());
+ ReservationInterval searchInterval = new ReservationInterval(
+ inMemReservation.getStartTime(), inMemReservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations == null) {
@@ -280,9 +385,8 @@ public class InMemoryPlan implements Plan {
ReservationId resId = reservation.getReservationId();
ReservationAllocation currReservation = getReservationById(resId);
if (currReservation == null) {
- String errMsg =
- "The specified Reservation with ID " + resId
- + " does not exist in the plan";
+ String errMsg = "The specified Reservation with ID " + resId
+ + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
@@ -318,9 +422,8 @@ public class InMemoryPlan implements Plan {
private boolean removeReservation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
- ReservationInterval searchInterval =
- new ReservationInterval(reservation.getStartTime(),
- reservation.getEndTime());
+ ReservationInterval searchInterval = new ReservationInterval(
+ reservation.getStartTime(), reservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations != null) {
@@ -337,16 +440,15 @@ public class InMemoryPlan implements Plan {
currentReservations.remove(searchInterval);
}
} else {
- String errMsg =
- "The specified Reservation with ID " + reservation.getReservationId()
- + " does not exist in the plan";
+ String errMsg = "The specified Reservation with ID "
+ + reservation.getReservationId() + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
- reservation.getReservationId());
+ reservation.getReservationId());
return true;
}
@@ -356,9 +458,8 @@ public class InMemoryPlan implements Plan {
try {
ReservationAllocation reservation = getReservationById(reservationID);
if (reservation == null) {
- String errMsg =
- "The specified Reservation with ID " + reservationID
- + " does not exist in the plan";
+ String errMsg = "The specified Reservation with ID " + reservationID
+ + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
@@ -453,66 +554,90 @@ public class InMemoryPlan implements Plan {
long start, long end) {
readLock.lock();
try {
+ // merge periodic and non-periodic allocations
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
+ RLESparseResourceAllocation userPeriodicResAlloc =
+ userPeriodicResourceAlloc.get(user);
+ if (userResAlloc != null && userPeriodicResAlloc != null) {
+ return RLESparseResourceAllocation.merge(resCalc, totalCapacity,
+ userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end);
+ }
if (userResAlloc != null) {
return userResAlloc.getRangeOverlapping(start, end);
- } else {
- return new RLESparseResourceAllocation(resCalc);
}
+ if (userPeriodicResAlloc != null) {
+ return userPeriodicResAlloc.getRangeOverlapping(start, end);
+ }
+ } catch (PlanningException e) {
+ LOG.warn("Exception while trying to merge periodic"
+ + " and non-periodic user allocations: {}", e.getMessage(), e);
} finally {
readLock.unlock();
}
+ return new RLESparseResourceAllocation(resCalc);
}
@Override
public Resource getTotalCommittedResources(long t) {
readLock.lock();
try {
- return rleSparseVector.getCapacityAtTime(t);
+ return Resources.add(rleSparseVector.getCapacityAtTime(t),
+ periodicRle.getCapacityAtTime(t));
} finally {
readLock.unlock();
}
}
@Override
- public Set<ReservationAllocation> getReservations(ReservationId
- reservationID, ReservationInterval interval) {
+ public Set<ReservationAllocation> getReservations(ReservationId reservationID,
+ ReservationInterval interval) {
return getReservations(reservationID, interval, null);
}
@Override
- public Set<ReservationAllocation> getReservations(ReservationId
- reservationID, ReservationInterval interval, String user) {
+ public Set<ReservationAllocation> getReservations(ReservationId reservationID,
+ ReservationInterval interval, String user) {
if (reservationID != null) {
ReservationAllocation allocation = getReservationById(reservationID);
- if (allocation == null){
+ if (allocation == null) {
return Collections.emptySet();
}
return Collections.singleton(allocation);
}
- long startTime = interval == null? 0 : interval.getStartTime();
- long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime();
+ long startTime = interval == null ? 0 : interval.getStartTime();
+ long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime();
ReservationInterval searchInterval =
- new ReservationInterval(endTime, Long.MAX_VALUE);
+ new ReservationInterval(endTime, Long.MAX_VALUE);
readLock.lock();
try {
- SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>>
- reservations = currentReservations.headMap(searchInterval, true);
- if (!reservations.isEmpty()) {
- Set<ReservationAllocation> flattenedReservations =
- new HashSet<>();
- for (Set<InMemoryReservationAllocation> reservationEntries :
- reservations.values()) {
- for (InMemoryReservationAllocation res : reservationEntries) {
- if (res.getEndTime() > startTime) {
- if (user != null && !user.isEmpty()
- && !res.getUser().equals(user)) {
- continue;
+ SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> res =
+ currentReservations.headMap(searchInterval, true);
+ if (!res.isEmpty()) {
+ Set<ReservationAllocation> flattenedReservations = new HashSet<>();
+ for (Set<InMemoryReservationAllocation> resEntries : res.values()) {
+ for (InMemoryReservationAllocation reservation : resEntries) {
+ // validate user
+ if (user != null && !user.isEmpty()
+ && !reservation.getUser().equals(user)) {
+ continue;
+ }
+ // handle periodic reservations
+ long period = reservation.getPeriodicity();
+ if (period > 0) {
+ long t = endTime % period;
+ // check for both contained and wrap-around reservations
+ if ((t - startTime) * (t - endTime)
+ * (startTime - endTime) >= 0) {
+ flattenedReservations.add(reservation);
+ }
+ } else {
+ // check for non-periodic reservations
+ if (reservation.getEndTime() > startTime) {
+ flattenedReservations.add(reservation);
}
- flattenedReservations.add(res);
}
}
}
@@ -550,36 +675,82 @@ public class InMemoryPlan implements Plan {
@Override
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
- ReservationId oldId, long start, long end) throws PlanningException {
+ ReservationId oldId, long start, long end, long period)
+ throws PlanningException {
readLock.lock();
try {
- // create RLE of totCapacity
- TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
- totAvailable.put(start, Resources.clone(totalCapacity));
- RLESparseResourceAllocation totRLEAvail =
- new RLESparseResourceAllocation(totAvailable, resCalc);
-
- // subtract used from available
- RLESparseResourceAllocation netAvailable;
-
- netAvailable =
- RLESparseResourceAllocation.merge(resCalc,
- Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
- RLEOperator.subtractTestNonNegative, start, end);
-
- // add back in old reservation used resources if any
- ReservationAllocation old = reservationTable.get(oldId);
- if (old != null) {
- netAvailable =
- RLESparseResourceAllocation.merge(resCalc,
- Resources.clone(totalCapacity), netAvailable,
- old.getResourcesOverTime(), RLEOperator.add, start, end);
+
+ // for non-periodic return simple available resources
+ if (period == 0) {
+
+ // create RLE of totCapacity
+ TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
+ totAvailable.put(start, Resources.clone(totalCapacity));
+ RLESparseResourceAllocation totRLEAvail =
+ new RLESparseResourceAllocation(totAvailable, resCalc);
+
+ // subtract used from available
+ RLESparseResourceAllocation netAvailable;
+
+ netAvailable = RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
+ RLEOperator.subtractTestNonNegative, start, end);
+
+ // remove periodic component
+ netAvailable = RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), netAvailable, periodicRle,
+ RLEOperator.subtractTestNonNegative, start, end);
+
+ // add back in old reservation used resources if any
+ ReservationAllocation old = reservationTable.get(oldId);
+ if (old != null) {
+
+ RLESparseResourceAllocation addBackPrevious =
+ old.getResourcesOverTime(start, end);
+ netAvailable = RLESparseResourceAllocation.merge(resCalc,
+ Resources.clone(totalCapacity), netAvailable, addBackPrevious,
+ RLEOperator.add, start, end);
+ }
+ // lower it if this is needed by the sharing policy
+ netAvailable = getSharingPolicy().availableResources(netAvailable, this,
+ user, oldId, start, end);
+ return netAvailable;
+ } else {
+
+ if (periodicRle.getTimePeriod() % period != 0) {
+ throw new PlanningException("The reservation periodicity (" + period
+ + ") must be" + "an exact divider of the system maxPeriod ("
+ + periodicRle.getTimePeriod() + ")");
+ }
+
+ // find the minimum resources available among all the instances that fit
+ // in the LCM
+ long numInstInLCM = periodicRle.getTimePeriod() / period;
+
+ RLESparseResourceAllocation minOverLCM =
+ getAvailableResourceOverTime(user, oldId, start, end, 0);
+ for (int i = 1; i < numInstInLCM; i++) {
+
+ long rStart = start + i * period;
+ long rEnd = end + i * period;
+
+ // recursive invocation of non-periodic range (to pick raw-info)
+ RLESparseResourceAllocation snapShot =
+ getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0);
+
+ // time-align on start
+ snapShot.shift(-(i * period));
+
+ // pick the minimum amount of resources in each time interval
+ minOverLCM =
+ RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(),
+ minOverLCM, snapShot, RLEOperator.min, start, end);
+
+ }
+
+ return minOverLCM;
+
}
- // lower it if this is needed by the sharing policy
- netAvailable =
- getSharingPolicy().availableResources(netAvailable, this, user,
- oldId, start, end);
- return netAvailable;
} finally {
readLock.unlock();
}
@@ -637,7 +808,7 @@ public class InMemoryPlan implements Plan {
public String toCumulativeString() {
readLock.lock();
try {
- return rleSparseVector.toString();
+ return rleSparseVector.toString() + "\n" + periodicRle.toString();
} finally {
readLock.unlock();
}
@@ -689,11 +860,18 @@ public class InMemoryPlan implements Plan {
}
@Override
- public RLESparseResourceAllocation getCumulativeLoadOverTime(
- long start, long end) {
+ public RLESparseResourceAllocation getCumulativeLoadOverTime(long start,
+ long end) throws PlanningException {
readLock.lock();
try {
- return rleSparseVector.getRangeOverlapping(start, end);
+
+ RLESparseResourceAllocation ret =
+ rleSparseVector.getRangeOverlapping(start, end);
+ ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret,
+ periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start,
+ end);
+
+ return ret;
} finally {
readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index 69fd43f..00c8e44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -42,6 +42,7 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
private final Map<ReservationInterval, Resource> allocationRequests;
private boolean hasGang = false;
private long acceptedAt = -1;
+ private long periodicity = 0;
private RLESparseResourceAllocation resourcesOverTime;
@@ -67,9 +68,16 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
this.allocationRequests = allocations;
this.planName = planName;
this.hasGang = hasGang;
- resourcesOverTime = new RLESparseResourceAllocation(calculator);
- for (Map.Entry<ReservationInterval, Resource> r : allocations
- .entrySet()) {
+ if (contract != null && contract.getRecurrenceExpression() != null) {
+ this.periodicity = Long.parseLong(contract.getRecurrenceExpression());
+ }
+ if (periodicity > 0) {
+ resourcesOverTime =
+ new PeriodicRLESparseResourceAllocation(calculator, periodicity);
+ } else {
+ resourcesOverTime = new RLESparseResourceAllocation(calculator);
+ }
+ for (Map.Entry<ReservationInterval, Resource> r : allocations.entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue());
}
}
@@ -133,17 +141,33 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
}
@Override
- public RLESparseResourceAllocation getResourcesOverTime(){
+ public RLESparseResourceAllocation getResourcesOverTime() {
return resourcesOverTime;
}
@Override
+ public RLESparseResourceAllocation getResourcesOverTime(long start,
+ long end) {
+ return resourcesOverTime.getRangeOverlapping(start, end);
+ }
+
+ @Override
+ public long getPeriodicity() {
+ return periodicity;
+ }
+
+ @Override
+ public void setPeriodicity(long period) {
+ periodicity = period;
+ }
+
+ @Override
public String toString() {
StringBuilder sBuf = new StringBuilder();
sBuf.append(getReservationId()).append(" user:").append(getUser())
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
- .append(getEndTime()).append(" alloc:\n[")
- .append(resourcesOverTime.toString()).append("] ");
+ .append(getEndTime()).append(" Periodiciy: ").append(periodicity)
+ .append(" alloc:\n[").append(resourcesOverTime.toString()).append("] ");
return sBuf.toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
index 55f1d00..49d4702 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java
@@ -40,7 +40,7 @@ public class NoOverCommitPolicy implements SharingPolicy {
RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
reservation.getUser(), reservation.getReservationId(),
- reservation.getStartTime(), reservation.getEndTime());
+ reservation.getStartTime(), reservation.getEndTime(), 0);
// test the reservation does not exceed what is available
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
index 8e3be8b3..7bc44f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
@@ -18,47 +18,94 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
- * This data structure stores a periodic RLESparseResourceAllocation.
+ * This data structure stores a periodic {@link RLESparseResourceAllocation}.
* Default period is 1 day (86400000ms).
*/
-public class PeriodicRLESparseResourceAllocation extends
- RLESparseResourceAllocation {
+public class PeriodicRLESparseResourceAllocation
+ extends RLESparseResourceAllocation {
// Log
- private static final Logger LOG = LoggerFactory
- .getLogger(PeriodicRLESparseResourceAllocation.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PeriodicRLESparseResourceAllocation.class);
private long timePeriod;
/**
* Constructor.
*
- * @param rleVector {@link RLESparseResourceAllocation} with the run-length
- encoded data.
+ * @param resourceCalculator {@link ResourceCalculator} the resource
+ * calculator to use.
* @param timePeriod Time period in milliseconds.
*/
public PeriodicRLESparseResourceAllocation(
- RLESparseResourceAllocation rleVector, Long timePeriod) {
- super(rleVector.getCumulative(), rleVector.getResourceCalculator());
+ ResourceCalculator resourceCalculator, Long timePeriod) {
+ super(resourceCalculator);
this.timePeriod = timePeriod;
}
/**
* Constructor. Default time period set to 1 day.
*
+ * @param resourceCalculator {@link ResourceCalculator} the resource
+ * calculator to use..
+ */
+ public PeriodicRLESparseResourceAllocation(
+ ResourceCalculator resourceCalculator) {
+ this(resourceCalculator,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
+ }
+
+ /**
+ * Constructor.
+ *
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
- encoded data.
+ * encoded data.
+ * @param timePeriod Time period in milliseconds.
*/
+ @VisibleForTesting
public PeriodicRLESparseResourceAllocation(
- RLESparseResourceAllocation rleVector) {
- this(rleVector, 86400000L);
+ RLESparseResourceAllocation rleVector, Long timePeriod) {
+ super(rleVector.getCumulative(), rleVector.getResourceCalculator());
+ this.timePeriod = timePeriod;
+
+ // make sure the PeriodicRLE is zero-based, and handles wrap-around
+ long delta = (getEarliestStartTime() % timePeriod - getEarliestStartTime());
+ shift(delta);
+
+ List<Long> toRemove = new ArrayList<>();
+ Map<Long, Resource> toAdd = new TreeMap<>();
+
+ for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+ if (entry.getKey() > timePeriod) {
+ toRemove.add(entry.getKey());
+ if (entry.getValue() != null) {
+ toAdd.put(timePeriod, entry.getValue());
+ long prev = entry.getKey() % timePeriod;
+ toAdd.put(prev, this.getCapacityAtTime(prev));
+ toAdd.put(0L, entry.getValue());
+ }
+ }
+ }
+ for (Long l : toRemove) {
+ cumulativeCapacity.remove(l);
+ }
+ cumulativeCapacity.putAll(toAdd);
}
/**
@@ -78,24 +125,25 @@ public class PeriodicRLESparseResourceAllocation extends
* The interval may include 0, but the end time must be strictly less than
* timePeriod.
*
- * @param interval {@link ReservationInterval} to which the specified
- * resource is to be added.
+ * @param interval {@link ReservationInterval} to which the specified resource
+ * is to be added.
* @param resource {@link Resource} to be added to the interval specified.
* @return true if addition is successful, false otherwise
*/
- public boolean addInterval(ReservationInterval interval,
- Resource resource) {
+ public boolean addInterval(ReservationInterval interval, Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
+
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
return super.addInterval(interval, resource);
} else {
- LOG.info("Cannot set capacity beyond end time: " + timePeriod);
+ LOG.info("Cannot set capacity beyond end time: " + timePeriod + " was ("
+ + interval.toString() + ")");
return false;
}
}
- /**
+ /**
* Removes a resource for the specified interval.
*
* @param interval the {@link ReservationInterval} for which the resource is
@@ -103,14 +151,15 @@ public class PeriodicRLESparseResourceAllocation extends
* @param resource the {@link Resource} to be removed.
* @return true if removal is successful, false otherwise
*/
- public boolean removeInterval(
- ReservationInterval interval, Resource resource) {
+ public boolean removeInterval(ReservationInterval interval,
+ Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
// If the resource to be subtracted is less than the minimum resource in
// the range, abort removal to avoid negative capacity.
- if (!Resources.fitsIn(
- resource, super.getMinimumCapacityInInterval(interval))) {
+ // TODO revesit decrementing endTime
+ if (!Resources.fitsIn(resource, getMinimumCapacityInInterval(
+ new ReservationInterval(startTime, endTime - 1)))) {
LOG.info("Request to remove more resources than what is available");
return false;
}
@@ -125,17 +174,16 @@ public class PeriodicRLESparseResourceAllocation extends
/**
* Get maximum capacity at periodic offsets from the specified time.
*
- * @param tick UTC time base from which offsets are specified for finding
- * the maximum capacity.
- * @param period periodic offset at which capacities are evaluted.
+ * @param tick UTC time base from which offsets are specified for finding the
+ * maximum capacity.
+ * @param period periodic offset at which capacities are evaluated.
* @return the maximum {@link Resource} across the specified time instants.
* @return true if removal is successful, false otherwise
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxResource;
if (period < timePeriod) {
- maxResource =
- super.getMaximumPeriodicCapacity(tick % timePeriod, period);
+ maxResource = super.getMaximumPeriodicCapacity(tick % timePeriod, period);
} else {
// if period is greater than the length of PeriodicRLESparseAllocation,
// only a single value exists in this interval.
@@ -164,4 +212,30 @@ public class PeriodicRLESparseResourceAllocation extends
return ret.toString();
}
+ @Override
+ public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
+ NavigableMap<Long, Resource> unrolledMap = new TreeMap<>();
+ readLock.lock();
+ try {
+ long relativeStart = (start >= 0) ? start % timePeriod : 0;
+ NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
+ Long previous = cumulativeMap.floorKey(relativeStart);
+ previous = (previous != null) ? previous : 0;
+ for (long i = 0; i <= (end - start) / timePeriod; i++) {
+ for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
+ long curKey = e.getKey() + (i * timePeriod);
+ if (curKey >= previous && (start + curKey - relativeStart) <= end) {
+ unrolledMap.put(curKey, e.getValue());
+ }
+ }
+ }
+ RLESparseResourceAllocation rle =
+ new RLESparseResourceAllocation(unrolledMap, getResourceCalculator());
+ rle.shift(start - relativeStart);
+ return rle;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
index 504a250..9afa324 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
@@ -28,54 +28,58 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
public interface PlanEdit extends PlanContext, PlanView {
/**
- * Add a new {@link ReservationAllocation} to the plan
+ * Add a new {@link ReservationAllocation} to the plan.
*
* @param reservation the {@link ReservationAllocation} to be added to the
* plan
* @param isRecovering flag to indicate if reservation is being added as part
* of failover or not
* @return true if addition is successful, false otherwise
+ * @throws PlanningException if addition is unsuccessful
*/
- public boolean addReservation(ReservationAllocation reservation,
+ boolean addReservation(ReservationAllocation reservation,
boolean isRecovering) throws PlanningException;
/**
* Updates an existing {@link ReservationAllocation} in the plan. This is
- * required for re-negotiation
+ * required for re-negotiation.
*
* @param reservation the {@link ReservationAllocation} to be updated the plan
* @return true if update is successful, false otherwise
+ * @throws PlanningException if update is unsuccessful
*/
- public boolean updateReservation(ReservationAllocation reservation)
+ boolean updateReservation(ReservationAllocation reservation)
throws PlanningException;
/**
* Delete an existing {@link ReservationAllocation} from the plan identified
* uniquely by its {@link ReservationId}. This will generally be used for
- * garbage collection
+ * garbage collection.
*
* @param reservationID the {@link ReservationAllocation} to be deleted from
* the plan identified uniquely by its {@link ReservationId}
* @return true if delete is successful, false otherwise
+ * @throws PlanningException if deletion is unsuccessful
*/
- public boolean deleteReservation(ReservationId reservationID)
+ boolean deleteReservation(ReservationId reservationID)
throws PlanningException;
/**
* Method invoked to garbage collect old reservations. It cleans up expired
- * reservations that have fallen out of the sliding archival window
+ * reservations that have fallen out of the sliding archival window.
*
* @param tick the current time from which the archival window is computed
+ * @throws PlanningException if archival is unsuccessful
*/
- public void archiveCompletedReservations(long tick) throws PlanningException;
+ void archiveCompletedReservations(long tick) throws PlanningException;
/**
* Sets the overall capacity in terms of {@link Resource} assigned to this
- * plan
+ * plan.
*
* @param capacity the overall capacity in terms of {@link Resource} assigned
* to this plan
*/
- public void setTotalCapacity(Resource capacity);
+ void setTotalCapacity(Resource capacity);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7996eca7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 2767993..4035f68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -17,50 +17,50 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import java.util.Set;
+
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import java.util.Set;
-
/**
* This interface provides a read-only view on the allocations made in this
* plan. This methods are used for example by {@code ReservationAgent}s to
* determine the free resources in a certain point in time, and by
* PlanFollowerPolicy to publish this plan to the scheduler.
*/
-public interface PlanView extends PlanContext {
+interface PlanView extends PlanContext {
/**
* Return a set of {@link ReservationAllocation} identified by the user who
* made the reservation.
*
* @param reservationID the unqiue id to identify the
- * {@link ReservationAllocation}
+ * {@link ReservationAllocation}
* @param interval the time interval used to retrieve the reservation
- * allocations from. Only reservations with start time no
- * greater than the interval end time, and end time no less
- * than the interval start time will be selected.
+ * allocations from. Only reservations with start time no greater
+ * than the interval end time, and end time no less than the interval
+ * start time will be selected.
* @param user the user to retrieve the reservation allocation from.
* @return a set of {@link ReservationAllocation} identified by the user who
- * made the reservation
+ * made the reservation
*/
- Set<ReservationAllocation> getReservations(ReservationId
- reservationID, ReservationInterval interval, String user);
+ Set<ReservationAllocation> getReservations(ReservationId reservationID,
+ ReservationInterval interval, String user);
/**
* Return a set of {@link ReservationAllocation} identified by any user.
*
* @param reservationID the unqiue id to identify the
- * {@link ReservationAllocation}
+ * {@link ReservationAllocation}
* @param interval the time interval used to retrieve the reservation
- * allocations from. Only reservations with start time no
- * greater than the interval end time, and end time no less
- * than the interval start time will be selected.
+ * allocations from. Only reservations with start time no greater
+ * than the interval end time, and end time no less than the interval
+ * start time will be selected.
* @return a set of {@link ReservationAllocation} identified by any user
*/
Set<ReservationAllocation> getReservations(ReservationId reservationID,
- ReservationInterval interval);
+ ReservationInterval interval);
/**
* Return a {@link ReservationAllocation} identified by its
@@ -70,7 +70,7 @@ public interface PlanView extends PlanContext {
* {@link ReservationAllocation}
* @return {@link ReservationAllocation} identified by the specified id
*/
- public ReservationAllocation getReservationById(ReservationId reservationID);
+ ReservationAllocation getReservationById(ReservationId reservationID);
/**
* Return a set of {@link ReservationAllocation} that belongs to a certain
@@ -78,11 +78,10 @@ public interface PlanView extends PlanContext {
*
* @param user the user being considered
* @param t the instant in time being considered
- * @return set of active {@link ReservationAllocation}s for this
- * user at this time
+ * @return set of active {@link ReservationAllocation}s for this user at this
+ * time
*/
- public Set<ReservationAllocation> getReservationByUserAtTime(String user,
- long t);
+ Set<ReservationAllocation> getReservationByUserAtTime(String user, long t);
/**
* Gets all the active reservations at the specified point of time
@@ -91,14 +90,14 @@ public interface PlanView extends PlanContext {
* requested
* @return set of active reservations at the specified time
*/
- public Set<ReservationAllocation> getReservationsAtTime(long tick);
+ Set<ReservationAllocation> getReservationsAtTime(long tick);
/**
* Gets all the reservations in the plan
*
* @return set of all reservations handled by this Plan
*/
- public Set<ReservationAllocation> getAllReservations();
+ Set<ReservationAllocation> getAllReservations();
/**
* Returns the total {@link Resource} reserved for all users at the specified
@@ -126,61 +125,68 @@ public interface PlanView extends PlanContext {
*
* @return the time (UTC in ms) at which the first reservation starts
*/
- public long getEarliestStartTime();
+ long getEarliestStartTime();
/**
* Returns the time (UTC in ms) at which the last reservation terminates
*
* @return the time (UTC in ms) at which the last reservation terminates
*/
- public long getLastEndTime();
+ long getLastEndTime();
/**
* This method returns the amount of resources available to a given user
* (optionally if removing a certain reservation) over the start-end time
- * range.
+ * range. If the request is periodic (period is non-zero) we return the
+ * minimum amount of resources available to periodic reservations (in all
+ * "period" windows within the system maxPeriod / LCM).
*
- * @param user
- * @param oldId
- * @param start
- * @param end
+ * @param user the user being considered
+ * @param oldId the identifier of the existing reservation
+ * @param start start of the time interval.
+ * @param end end of the time interval.
+ * @param period the ms periodicty for this request (loop and pick min till
+ * maxPeriodicity)
* @return a view of the plan as it is available to this user
- * @throws PlanningException
+ * @throws PlanningException if operation is unsuccessful
*/
- public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
- ReservationId oldId, long start, long end) throws PlanningException;
+ RLESparseResourceAllocation getAvailableResourceOverTime(String user,
+ ReservationId oldId, long start, long end, long period)
+ throws PlanningException;
/**
* This method returns a RLE encoded view of the user reservation count
* utilization between start and end time.
*
- * @param user
- * @param start
- * @param end
+ * @param user the user being considered
+ * @param start start of the time interval.
+ * @param end end of the time interval.
* @return RLE encoded view of reservation used over time
*/
- public RLESparseResourceAllocation getReservationCountForUserOverTime(
- String user, long start, long end);
+ RLESparseResourceAllocation getReservationCountForUserOverTime(String user,
+ long start, long end);
/**
* This method returns a RLE encoded view of the user reservation utilization
* between start and end time.
*
- * @param user
- * @param start
- * @param end
+ * @param user the user being considered
+ * @param start start of the time interval.
+ * @param end end of the time interval.
* @return RLE encoded view of resources used over time
*/
- public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
+ RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end);
/**
* Get the cumulative load over a time interval.
*
- * @param start Start of the time interval.
- * @param end End of the time interval.
+ * @param start start of the time interval.
+ * @param end end of the time interval.
* @return RLE sparse allocation.
+ * @throws PlanningException if operation is unsuccessful
*/
- RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
+ RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end)
+ throws PlanningException;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org