You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/04 02:10:07 UTC
[09/16] git commit: YARN-1712. Plan follower that synchronizes the
current state of reservation subsystem with the scheduler. Contributed by
Subru Krishnan and Carlo Curino. (cherry picked from commit
169085319b8b76641f8b9f6840a3fef06d221e2b)
YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit 169085319b8b76641f8b9f6840a3fef06d221e2b)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3418c56b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3418c56b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3418c56b
Branch: refs/heads/trunk
Commit: 3418c56bcf4bbddaf483bdaa1a15a8bbc4039bfe
Parents: b6df0dd
Author: subru <su...@outlook.com>
Authored: Tue Sep 16 16:45:45 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Oct 3 15:42:10 2014 -0700
----------------------------------------------------------------------
YARN-1051-CHANGES.txt | 7 +-
.../CapacitySchedulerPlanFollower.java | 367 +++++++++++++++++++
.../reservation/PlanFollower.java | 67 ++++
.../TestCapacitySchedulerPlanFollower.java | 319 ++++++++++++++++
4 files changed, 758 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index e9ec691..56b3c12 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -5,10 +5,10 @@ YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting
-reservations. (Carlo Curino and Subru Krishnan via subru)
+reservations. (Subru Krishnan and Carlo Curino via subru)
YARN-1709. In-memory data structures used to track resources over
-time to enable reservations. (Carlo Curino and Subru Krishnan via
+time to enable reservations. (Subru Krishnan and Carlo Curino via
subru)
YARN-1710. Logic to find allocations within a Plan that satisfy
@@ -17,3 +17,6 @@ curino)
YARN-1711. Policy to enforce instantaneous and over-time quotas
on user reservations. (Carlo Curino and Subru Krishnan via curino)
+
+YARN-1712. Plan follower that synchronizes the current state of reservation
+subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
new file mode 100644
index 0000000..cfa172c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a {@link PlanFollower}. This is invoked on a timer, and
+ * it is in charge to publish the state of the {@link Plan}s to the underlying
+ * {@link CapacityScheduler}. This implementation does so, by
+ * adding/removing/resizing leaf queues in the scheduler, thus affecting the
+ * dynamic behavior of the scheduler in a way that is consistent with the
+ * content of the plan. It also updates the plan's view on how much resources
+ * are available in the cluster.
+ *
+ * This implementation of PlanFollower is relatively stateless, and it can
+ * synchronize schedulers and Plans that have arbitrary changes (performing set
+ * differences among existing queues). This makes it resilient to frequency of
+ * synchronization, and RM restart issues (no "catch up" is necessary).
+ */
+public class CapacitySchedulerPlanFollower implements PlanFollower {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CapacitySchedulerPlanFollower.class);
+
+ private Collection<Plan> plans = new ArrayList<Plan>();
+
+ private Clock clock;
+ private CapacityScheduler scheduler;
+
+ @Override
+ public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+ LOG.info("Initializing Plan Follower Policy:"
+ + this.getClass().getCanonicalName());
+ if (!(sched instanceof CapacityScheduler)) {
+ throw new YarnRuntimeException(
+ "CapacitySchedulerPlanFollower can only work with CapacityScheduler");
+ }
+ this.clock = clock;
+ this.scheduler = (CapacityScheduler) sched;
+ this.plans.addAll(plans);
+ }
+
+ @Override
+ public synchronized void run() {
+ for (Plan plan : plans) {
+ synchronizePlan(plan);
+ }
+ }
+
+ @Override
+ public synchronized void synchronizePlan(Plan plan) {
+ String planQueueName = plan.getQueueName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
+ }
+ // align with plan step
+ long step = plan.getStep();
+ long now = clock.getTime();
+ if (now % step != 0) {
+ now += step - (now % step);
+ }
+ CSQueue queue = scheduler.getQueue(planQueueName);
+ if (!(queue instanceof PlanQueue)) {
+ LOG.error("The Plan is not an PlanQueue!");
+ return;
+ }
+ PlanQueue planQueue = (PlanQueue) queue;
+ // first we publish to the plan the current availability of resources
+ Resource clusterResources = scheduler.getClusterResource();
+ float planAbsCap = planQueue.getAbsoluteCapacity();
+ Resource planResources = Resources.multiply(clusterResources, planAbsCap);
+ plan.setTotalCapacity(planResources);
+
+ Set<ReservationAllocation> currentReservations =
+ plan.getReservationsAtTime(now);
+ Set<String> curReservationNames = new HashSet<String>();
+ Resource reservedResources = Resource.newInstance(0, 0);
+ int numRes = 0;
+ if (currentReservations != null) {
+ numRes = currentReservations.size();
+ for (ReservationAllocation reservation : currentReservations) {
+ curReservationNames.add(reservation.getReservationId().toString());
+ Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
+ }
+ }
+ // create the default reservation queue if it doesnt exist
+ String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+ if (scheduler.getQueue(defReservationQueue) == null) {
+ ReservationQueue defQueue =
+ new ReservationQueue(scheduler, defReservationQueue, planQueue);
+ try {
+ scheduler.addQueue(defQueue);
+ } catch (SchedulerDynamicEditException e) {
+ LOG.warn(
+ "Exception while trying to create default reservation queue for plan: {}",
+ planQueueName, e);
+ }
+ }
+ curReservationNames.add(defReservationQueue);
+ // if the resources dedicated to this plan has shrunk invoke replanner
+ if (Resources.greaterThan(scheduler.getResourceCalculator(),
+ clusterResources, reservedResources, planResources)) {
+ try {
+ plan.getReplanner().plan(plan, null);
+ } catch (PlanningException e) {
+ LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+ }
+ }
+ // identify the reservations that have expired and new reservations that
+ // have to be activated
+ List<CSQueue> resQueues = planQueue.getChildQueues();
+ Set<String> expired = new HashSet<String>();
+ for (CSQueue resQueue : resQueues) {
+ String resQueueName = resQueue.getQueueName();
+ if (curReservationNames.contains(resQueueName)) {
+ // it is already existing reservation, so needed not create new
+ // reservation queue
+ curReservationNames.remove(resQueueName);
+ } else {
+ // the reservation has termination, mark for cleanup
+ expired.add(resQueueName);
+ }
+ }
+ // garbage collect expired reservations
+ cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
+
+ // Add new reservations and update existing ones
+ float totalAssignedCapacity = 0f;
+ if (currentReservations != null) {
+ // first release all excess capacity in default queue
+ try {
+ scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
+ 1.0f));
+ } catch (YarnException e) {
+ LOG.warn(
+ "Exception while trying to release default queue capacity for plan: {}",
+ planQueueName, e);
+ }
+ // sort allocations from the one giving up the most resources, to the
+ // one asking for the most
+ // avoid order-of-operation errors that temporarily violate 100%
+ // capacity bound
+ List<ReservationAllocation> sortedAllocations =
+ sortByDelta(
+ new ArrayList<ReservationAllocation>(currentReservations), now);
+ for (ReservationAllocation res : sortedAllocations) {
+ String currResId = res.getReservationId().toString();
+ if (curReservationNames.contains(currResId)) {
+ ReservationQueue resQueue =
+ new ReservationQueue(scheduler, currResId, planQueue);
+ try {
+ scheduler.addQueue(resQueue);
+ } catch (SchedulerDynamicEditException e) {
+ LOG.warn(
+ "Exception while trying to activate reservation: {} for plan: {}",
+ currResId, planQueueName, e);
+ }
+ }
+ Resource capToAssign = res.getResourcesAtTime(now);
+ float targetCapacity = 0f;
+ if (planResources.getMemory() > 0
+ && planResources.getVirtualCores() > 0) {
+ targetCapacity =
+ Resources.divide(scheduler.getResourceCalculator(),
+ clusterResources, capToAssign, planResources);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Assigning capacity of {} to queue {} with target capacity {}",
+ capToAssign, currResId, targetCapacity);
+ }
+ // set maxCapacity to 100% unless the job requires gang, in which
+ // case we stick to capacity (as running early/before is likely a
+ // waste of resources)
+ float maxCapacity = 1.0f;
+ if (res.containsGangs()) {
+ maxCapacity = targetCapacity;
+ }
+ try {
+ scheduler.setEntitlement(currResId, new QueueEntitlement(
+ targetCapacity, maxCapacity));
+ } catch (YarnException e) {
+ LOG.warn("Exception while trying to size reservation for plan: {}",
+ currResId, planQueueName, e);
+ }
+ totalAssignedCapacity += targetCapacity;
+ }
+ }
+ // compute the default queue capacity
+ float defQCap = 1.0f - totalAssignedCapacity;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+ + "currReservation: {} default-queue capacity: {}", planResources,
+ numRes, defQCap);
+ }
+ // set the default queue to eat-up all remaining capacity
+ try {
+ scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
+ defQCap, 1.0f));
+ } catch (YarnException e) {
+ LOG.warn(
+ "Exception while trying to reclaim default queue capacity for plan: {}",
+ planQueueName, e);
+ }
+ // garbage collect finished reservations from plan
+ try {
+ plan.archiveCompletedReservations(now);
+ } catch (PlanningException e) {
+ LOG.error("Exception in archiving completed reservations: ", e);
+ }
+ LOG.info("Finished iteration of plan follower edit policy for plan: "
+ + planQueueName);
+
+ // Extension: update plan with app states,
+ // useful to support smart replanning
+ }
+
+ /**
+ * Move all apps in the set of queues to the parent plan queue's default
+ * reservation queue in a synchronous fashion
+ */
+ private void moveAppsInQueueSync(String expiredReservation,
+ String defReservationQueue) {
+ List<ApplicationAttemptId> activeApps =
+ scheduler.getAppsInQueue(expiredReservation);
+ if (activeApps.isEmpty()) {
+ return;
+ }
+ for (ApplicationAttemptId app : activeApps) {
+ // fallback to parent's default queue
+ try {
+ scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
+ } catch (YarnException e) {
+ LOG.warn(
+ "Encountered unexpected error during migration of application: {} from reservation: {}",
+ app, expiredReservation, e);
+ }
+ }
+ }
+
+ /**
+ * First sets entitlement of queues to zero to prevent new app submission.
+ * Then move all apps in the set of queues to the parent plan queue's default
+ * reservation queue if move is enabled. Finally cleanups the queue by killing
+ * any apps (if move is disabled or move failed) and removing the queue
+ */
+ private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
+ String defReservationQueue) {
+ for (String expiredReservation : toRemove) {
+ try {
+ // reduce entitlement to 0
+ scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
+ 0.0f));
+ if (shouldMove) {
+ moveAppsInQueueSync(expiredReservation, defReservationQueue);
+ }
+ if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
+ scheduler.killAllAppsInQueue(expiredReservation);
+ LOG.info("Killing applications in queue: {}", expiredReservation);
+ } else {
+ scheduler.removeQueue(expiredReservation);
+ LOG.info("Queue: " + expiredReservation + " removed");
+ }
+ } catch (YarnException e) {
+ LOG.warn("Exception while trying to expire reservation: {}",
+ expiredReservation, e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void setPlans(Collection<Plan> plans) {
+ this.plans.clear();
+ this.plans.addAll(plans);
+ }
+
+ /**
+ * Sort in the order from the least new amount of resources asked (likely
+ * negative) to the highest. This prevents "order-of-operation" errors related
+ * to exceeding 100% capacity temporarily.
+ */
+ private List<ReservationAllocation> sortByDelta(
+ List<ReservationAllocation> currentReservations, long now) {
+ Collections.sort(currentReservations, new ReservationAllocationComparator(
+ scheduler, now));
+ return currentReservations;
+ }
+
+ private class ReservationAllocationComparator implements
+ Comparator<ReservationAllocation> {
+ CapacityScheduler scheduler;
+ long now;
+
+ ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
+ this.scheduler = scheduler;
+ this.now = now;
+ }
+
+ private Resource getUnallocatedReservedResources(
+ ReservationAllocation reservation) {
+ Resource resResource;
+ CSQueue resQueue =
+ scheduler.getQueue(reservation.getReservationId().toString());
+ if (resQueue != null) {
+ resResource =
+ Resources.subtract(
+ reservation.getResourcesAtTime(now),
+ Resources.multiply(scheduler.getClusterResource(),
+ resQueue.getAbsoluteCapacity()));
+ } else {
+ resResource = reservation.getResourcesAtTime(now);
+ }
+ return resResource;
+ }
+
+ @Override
+ public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
+ // compute delta between current and previous reservation, and compare
+ // based on that
+ Resource lhsRes = getUnallocatedReservedResources(lhs);
+ Resource rhsRes = getUnallocatedReservedResources(rhs);
+ return lhsRes.compareTo(rhsRes);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
new file mode 100644
index 0000000..9d00366
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
@@ -0,0 +1,67 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+
+/**
+ * A PlanFollower is a component that runs on a timer, and synchronizes the
+ * underlying {@link ResourceScheduler} with the {@link Plan}(s) and viceversa.
+ *
+ * While different implementations might operate differently, the key idea is to
+ * map the current allocation of resources for each active reservation in the
+ * plan(s), to a corresponding notion in the underlying scheduler (e.g., tuning
+ * capacity of queues, set pool weights, or tweak application priorities). The
+ * goal is to affect the dynamic allocation of resources done by the scheduler
+ * so that the jobs obtain access to resources in a way that is consistent with
+ * the reservations in the plan. A key conceptual step here is to convert the
+ * absolute-valued promises made in the reservations to appropriate relative
+ * priorities/queue sizes etc.
+ *
+ * Symmetrically the PlanFollower exposes changes in cluster conditions (as
+ * tracked by the scheduler) to the plan, e.g., the overall amount of physical
+ * resources available. The Plan in turn can react by replanning its allocations
+ * if appropriate.
+ *
+ * The implementation can assume that is run frequently enough to be able to
+ * observe and react to normal operational changes in cluster conditions on the
+ * fly (e.g., if cluster resources drop, we can update the relative weights of a
+ * queue so that the absolute promises made to the job at reservation time are
+ * respected).
+ *
+ * However, due to RM restarts and the related downtime, it is advisable for
+ * implementations to operate in a stateless way, and be able to synchronize the
+ * state of plans/scheduler regardless of how big is the time gap between
+ * executions.
+ */
+public interface PlanFollower extends Runnable {
+
+ /**
+ * Init function that configures the PlanFollower, by providing:
+ *
+ * @param clock a reference to the system clock.
+ * @param sched a reference to the underlying scheduler
+ * @param plans references to the plans we should keep synchronized at every
+ * time tick.
+ */
+ public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans);
+
+ /**
+ * The function performing the actual synchronization operation for a given
+ * Plan. This is normally invoked by the run method, but it can be invoked
+ * synchronously to avoid race conditions when a user's reservation request
+ * start time is imminent.
+ *
+ * @param plan the Plan to synchronize
+ */
+ public void synchronizePlan(Plan plan);
+
+ /**
+ * Setter for the list of plans.
+ *
+ * @param plans the collection of Plans we operate on at every time tick.
+ */
+ public void setPlans(Collection<Plan> plans);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
new file mode 100644
index 0000000..4eedd42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+public class TestCapacitySchedulerPlanFollower {
+
+ final static int GB = 1024;
+
+ private Clock mClock = null;
+ private CapacityScheduler scheduler = null;
+ private RMContext rmContext;
+ private RMContext spyRMContext;
+ private CapacitySchedulerContext csContext;
+ private ReservationAgent mAgent;
+ private Plan plan;
+ private Resource minAlloc = Resource.newInstance(GB, 1);
+ private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
+ private ResourceCalculator res = new DefaultResourceCalculator();
+ private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Before
+ public void setUp() throws Exception {
+ CapacityScheduler spyCs = new CapacityScheduler();
+ scheduler = spy(spyCs);
+ rmContext = TestUtils.getMockRMContext();
+ spyRMContext = spy(rmContext);
+
+ ConcurrentMap<ApplicationId, RMApp> spyApps =
+ spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+ RMApp rmApp = mock(RMApp.class);
+ when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
+ .thenReturn(null);
+ Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
+ when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ ReservationSystemTestUtil.setupQueueConfiguration(csConf);
+
+ scheduler.setConf(csConf);
+
+ csContext = mock(CapacitySchedulerContext.class);
+ when(csContext.getConfiguration()).thenReturn(csConf);
+ when(csContext.getConf()).thenReturn(csConf);
+ when(csContext.getMinimumResourceCapability()).thenReturn(minAlloc);
+ when(csContext.getMaximumResourceCapability()).thenReturn(maxAlloc);
+ when(csContext.getClusterResource()).thenReturn(
+ Resources.createResource(100 * 16 * GB, 100 * 32));
+ when(scheduler.getClusterResource()).thenReturn(
+ Resources.createResource(125 * GB, 125));
+ when(csContext.getResourceCalculator()).thenReturn(
+ new DefaultResourceCalculator());
+ RMContainerTokenSecretManager containerTokenSecretManager =
+ new RMContainerTokenSecretManager(csConf);
+ containerTokenSecretManager.rollMasterKey();
+ when(csContext.getContainerTokenSecretManager()).thenReturn(
+ containerTokenSecretManager);
+
+ scheduler.setRMContext(spyRMContext);
+ scheduler.init(csConf);
+ scheduler.start();
+
+ setupPlanFollower();
+ }
+
+ private void setupPlanFollower() throws Exception {
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ mClock = mock(Clock.class);
+ mAgent = mock(ReservationAgent.class);
+
+ String reservationQ = testUtil.getFullReservationQueueName();
+ CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
+ csConf.setReservationWindow(reservationQ, 20L);
+ csConf.setMaximumCapacity(reservationQ, 40);
+ csConf.setAverageCapacity(reservationQ, 20);
+ policy.init(reservationQ, csConf);
+ }
+
+ @Test
+ public void testWithMoveOnExpiry() throws PlanningException,
+ InterruptedException, AccessControlException {
+ // invoke plan follower test with move
+ testPlanFollower(true);
+ }
+
+ @Test
+ public void testWithKillOnExpiry() throws PlanningException,
+ InterruptedException, AccessControlException {
+ // invoke plan follower test with kill
+ testPlanFollower(false);
+ }
+
+ private void testPlanFollower(boolean isMove) throws PlanningException,
+ InterruptedException, AccessControlException {
+ // Initialize plan based on move flag
+ plan =
+ new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+ scheduler.getClusterResource(), 1L, res,
+ scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
+ null, isMove);
+
+ // add a few reservations to the plan
+ long ts = System.currentTimeMillis();
+ ReservationId r1 = ReservationId.newInstance(ts, 1);
+ int[] f1 = { 10, 10, 10, 10, 10 };
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+ "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
+ .generateAllocation(0L, 1L, f1), res, minAlloc)));
+
+ ReservationId r2 = ReservationId.newInstance(ts, 2);
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
+ "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
+ .generateAllocation(3L, 1L, f1), res, minAlloc)));
+
+ ReservationId r3 = ReservationId.newInstance(ts, 3);
+ int[] f2 = { 0, 10, 20, 10, 0 };
+ assertTrue(plan.toString(),
+ plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
+ "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
+ .generateAllocation(10L, 1L, f2), res, minAlloc)));
+
+ CapacitySchedulerPlanFollower planFollower =
+ new CapacitySchedulerPlanFollower();
+ planFollower.init(mClock, scheduler, Collections.singletonList(plan));
+
+ when(mClock.getTime()).thenReturn(0L);
+ planFollower.run();
+
+ CSQueue defQ =
+ scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
+ CSQueue q = scheduler.getQueue(r1.toString());
+ assertNotNull(q);
+ // submit an app to r1
+ String user_0 = "test-user";
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId appAttemptId_0 =
+ ApplicationAttemptId.newInstance(appId, 0);
+ AppAddedSchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+ scheduler.handle(addAppEvent);
+ AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
+ scheduler.handle(appAttemptAddedEvent);
+
+ // initial default reservation queue should have no apps
+ Assert.assertEquals(0, defQ.getNumApplications());
+
+ Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+ Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+ Assert.assertEquals(1, q.getNumApplications());
+
+ CSQueue q2 = scheduler.getQueue(r2.toString());
+ assertNull(q2);
+ CSQueue q3 = scheduler.getQueue(r3.toString());
+ assertNull(q3);
+
+ when(mClock.getTime()).thenReturn(3L);
+ planFollower.run();
+
+ Assert.assertEquals(0, defQ.getNumApplications());
+ q = scheduler.getQueue(r1.toString());
+ assertNotNull(q);
+ Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+ Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+ Assert.assertEquals(1, q.getNumApplications());
+ q2 = scheduler.getQueue(r2.toString());
+ assertNotNull(q2);
+ Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+ Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+ q3 = scheduler.getQueue(r3.toString());
+ assertNull(q3);
+
+ when(mClock.getTime()).thenReturn(10L);
+ planFollower.run();
+
+ q = scheduler.getQueue(r1.toString());
+ if (isMove) {
+ // app should have been moved to default reservation queue
+ Assert.assertEquals(1, defQ.getNumApplications());
+ assertNull(q);
+ } else {
+ // app should be killed
+ Assert.assertEquals(0, defQ.getNumApplications());
+ assertNotNull(q);
+ AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+ new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
+ RMAppAttemptState.KILLED, false);
+ scheduler.handle(appAttemptRemovedEvent);
+ }
+ q2 = scheduler.getQueue(r2.toString());
+ assertNull(q2);
+ q3 = scheduler.getQueue(r3.toString());
+ assertNotNull(q3);
+ Assert.assertEquals(0, q3.getCapacity(), 0.01);
+ Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
+
+ when(mClock.getTime()).thenReturn(11L);
+ planFollower.run();
+
+ if (isMove) {
+ // app should have been moved to default reservation queue
+ Assert.assertEquals(1, defQ.getNumApplications());
+ } else {
+ // app should be killed
+ Assert.assertEquals(0, defQ.getNumApplications());
+ }
+ q = scheduler.getQueue(r1.toString());
+ assertNull(q);
+ q2 = scheduler.getQueue(r2.toString());
+ assertNull(q2);
+ q3 = scheduler.getQueue(r3.toString());
+ assertNotNull(q3);
+ Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
+ Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
+
+ when(mClock.getTime()).thenReturn(12L);
+ planFollower.run();
+
+ q = scheduler.getQueue(r1.toString());
+ assertNull(q);
+ q2 = scheduler.getQueue(r2.toString());
+ assertNull(q2);
+ q3 = scheduler.getQueue(r3.toString());
+ assertNotNull(q3);
+ Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
+ Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
+
+ when(mClock.getTime()).thenReturn(16L);
+ planFollower.run();
+
+ q = scheduler.getQueue(r1.toString());
+ assertNull(q);
+ q2 = scheduler.getQueue(r2.toString());
+ assertNull(q2);
+ q3 = scheduler.getQueue(r3.toString());
+ assertNull(q3);
+
+ assertTrue(defQ.getCapacity() > 0.9);
+
+ }
+
+ public static ApplicationACLsManager mockAppACLsManager() {
+ Configuration conf = new Configuration();
+ return new ApplicationACLsManager(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (scheduler != null) {
+ scheduler.stop();
+ }
+ }
+
+}