You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/04 02:10:07 UTC

[09/16] git commit: YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. Contributed by Subru Krishnan and Carlo Curino. (cherry picked from commit 169085319b8b76641f8b9f6840a3fef06d221e2b)

YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit 169085319b8b76641f8b9f6840a3fef06d221e2b)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3418c56b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3418c56b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3418c56b

Branch: refs/heads/trunk
Commit: 3418c56bcf4bbddaf483bdaa1a15a8bbc4039bfe
Parents: b6df0dd
Author: subru <su...@outlook.com>
Authored: Tue Sep 16 16:45:45 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Oct 3 15:42:10 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |   7 +-
 .../CapacitySchedulerPlanFollower.java          | 367 +++++++++++++++++++
 .../reservation/PlanFollower.java               |  67 ++++
 .../TestCapacitySchedulerPlanFollower.java      | 319 ++++++++++++++++
 4 files changed, 758 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index e9ec691..56b3c12 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -5,10 +5,10 @@ YARN-2475. Logic for responding to capacity drops for the
 ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
 
 YARN-1708. Public YARN APIs for creating/updating/deleting 
-reservations. (Carlo Curino and Subru Krishnan via subru)
+reservations. (Subru Krishnan and Carlo Curino  via subru)
 
 YARN-1709. In-memory data structures used to track resources over
-time to enable reservations. (Carlo Curino and Subru Krishnan via 
+time to enable reservations. (Subru Krishnan and Carlo Curino via 
 subru)
 
 YARN-1710. Logic to find allocations within a Plan that satisfy 
@@ -17,3 +17,6 @@ curino)
 
 YARN-1711. Policy to enforce instantaneous and over-time quotas 
 on user reservations. (Carlo Curino and Subru Krishnan via curino)
+
+YARN-1712. Plan follower that synchronizes the current state of reservation
+subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
new file mode 100644
index 0000000..cfa172c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a {@link PlanFollower}. This is invoked on a timer, and
+ * it is in charge to publish the state of the {@link Plan}s to the underlying
+ * {@link CapacityScheduler}. This implementation does so, by
+ * adding/removing/resizing leaf queues in the scheduler, thus affecting the
+ * dynamic behavior of the scheduler in a way that is consistent with the
+ * content of the plan. It also updates the plan's view on how much resources
+ * are available in the cluster.
+ * 
+ * This implementation of PlanFollower is relatively stateless, and it can
+ * synchronize schedulers and Plans that have arbitrary changes (performing set
+ * differences among existing queues). This makes it resilient to frequency of
+ * synchronization, and RM restart issues (no "catch up" is necessary).
+ */
+public class CapacitySchedulerPlanFollower implements PlanFollower {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CapacitySchedulerPlanFollower.class);
+
+  private Collection<Plan> plans = new ArrayList<Plan>();
+
+  private Clock clock;
+  private CapacityScheduler scheduler;
+
+  @Override
+  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+    LOG.info("Initializing Plan Follower Policy:"
+        + this.getClass().getCanonicalName());
+    if (!(sched instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException(
+          "CapacitySchedulerPlanFollower can only work with CapacityScheduler");
+    }
+    this.clock = clock;
+    this.scheduler = (CapacityScheduler) sched;
+    this.plans.addAll(plans);
+  }
+
+  @Override
+  public synchronized void run() {
+    for (Plan plan : plans) {
+      synchronizePlan(plan);
+    }
+  }
+
+  @Override
+  public synchronized void synchronizePlan(Plan plan) {
+    String planQueueName = plan.getQueueName();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
+    }
+    // align with plan step
+    long step = plan.getStep();
+    long now = clock.getTime();
+    if (now % step != 0) {
+      now += step - (now % step);
+    }
+    CSQueue queue = scheduler.getQueue(planQueueName);
+    if (!(queue instanceof PlanQueue)) {
+      LOG.error("The Plan is not an PlanQueue!");
+      return;
+    }
+    PlanQueue planQueue = (PlanQueue) queue;
+    // first we publish to the plan the current availability of resources
+    Resource clusterResources = scheduler.getClusterResource();
+    float planAbsCap = planQueue.getAbsoluteCapacity();
+    Resource planResources = Resources.multiply(clusterResources, planAbsCap);
+    plan.setTotalCapacity(planResources);
+
+    Set<ReservationAllocation> currentReservations =
+        plan.getReservationsAtTime(now);
+    Set<String> curReservationNames = new HashSet<String>();
+    Resource reservedResources = Resource.newInstance(0, 0);
+    int numRes = 0;
+    if (currentReservations != null) {
+      numRes = currentReservations.size();
+      for (ReservationAllocation reservation : currentReservations) {
+        curReservationNames.add(reservation.getReservationId().toString());
+        Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
+      }
+    }
+    // create the default reservation queue if it doesnt exist
+    String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    if (scheduler.getQueue(defReservationQueue) == null) {
+      ReservationQueue defQueue =
+          new ReservationQueue(scheduler, defReservationQueue, planQueue);
+      try {
+        scheduler.addQueue(defQueue);
+      } catch (SchedulerDynamicEditException e) {
+        LOG.warn(
+            "Exception while trying to create default reservation queue for plan: {}",
+            planQueueName, e);
+      }
+    }
+    curReservationNames.add(defReservationQueue);
+    // if the resources dedicated to this plan has shrunk invoke replanner
+    if (Resources.greaterThan(scheduler.getResourceCalculator(),
+        clusterResources, reservedResources, planResources)) {
+      try {
+        plan.getReplanner().plan(plan, null);
+      } catch (PlanningException e) {
+        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+      }
+    }
+    // identify the reservations that have expired and new reservations that
+    // have to be activated
+    List<CSQueue> resQueues = planQueue.getChildQueues();
+    Set<String> expired = new HashSet<String>();
+    for (CSQueue resQueue : resQueues) {
+      String resQueueName = resQueue.getQueueName();
+      if (curReservationNames.contains(resQueueName)) {
+        // it is already existing reservation, so needed not create new
+        // reservation queue
+        curReservationNames.remove(resQueueName);
+      } else {
+        // the reservation has termination, mark for cleanup
+        expired.add(resQueueName);
+      }
+    }
+    // garbage collect expired reservations
+    cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
+
+    // Add new reservations and update existing ones
+    float totalAssignedCapacity = 0f;
+    if (currentReservations != null) {
+      // first release all excess capacity in default queue
+      try {
+        scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
+            1.0f));
+      } catch (YarnException e) {
+        LOG.warn(
+            "Exception while trying to release default queue capacity for plan: {}",
+            planQueueName, e);
+      }
+      // sort allocations from the one giving up the most resources, to the
+      // one asking for the most
+      // avoid order-of-operation errors that temporarily violate 100%
+      // capacity bound
+      List<ReservationAllocation> sortedAllocations =
+          sortByDelta(
+              new ArrayList<ReservationAllocation>(currentReservations), now);
+      for (ReservationAllocation res : sortedAllocations) {
+        String currResId = res.getReservationId().toString();
+        if (curReservationNames.contains(currResId)) {
+          ReservationQueue resQueue =
+              new ReservationQueue(scheduler, currResId, planQueue);
+          try {
+            scheduler.addQueue(resQueue);
+          } catch (SchedulerDynamicEditException e) {
+            LOG.warn(
+                "Exception while trying to activate reservation: {} for plan: {}",
+                currResId, planQueueName, e);
+          }
+        }
+        Resource capToAssign = res.getResourcesAtTime(now);
+        float targetCapacity = 0f;
+        if (planResources.getMemory() > 0
+            && planResources.getVirtualCores() > 0) {
+          targetCapacity =
+              Resources.divide(scheduler.getResourceCalculator(),
+                  clusterResources, capToAssign, planResources);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Assigning capacity of {} to queue {} with target capacity {}",
+              capToAssign, currResId, targetCapacity);
+        }
+        // set maxCapacity to 100% unless the job requires gang, in which
+        // case we stick to capacity (as running early/before is likely a
+        // waste of resources)
+        float maxCapacity = 1.0f;
+        if (res.containsGangs()) {
+          maxCapacity = targetCapacity;
+        }
+        try {
+          scheduler.setEntitlement(currResId, new QueueEntitlement(
+              targetCapacity, maxCapacity));
+        } catch (YarnException e) {
+          LOG.warn("Exception while trying to size reservation for plan: {}",
+              currResId, planQueueName, e);
+        }
+        totalAssignedCapacity += targetCapacity;
+      }
+    }
+    // compute the default queue capacity
+    float defQCap = 1.0f - totalAssignedCapacity;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+          + "currReservation: {} default-queue capacity: {}", planResources,
+          numRes, defQCap);
+    }
+    // set the default queue to eat-up all remaining capacity
+    try {
+      scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
+          defQCap, 1.0f));
+    } catch (YarnException e) {
+      LOG.warn(
+          "Exception while trying to reclaim default queue capacity for plan: {}",
+          planQueueName, e);
+    }
+    // garbage collect finished reservations from plan
+    try {
+      plan.archiveCompletedReservations(now);
+    } catch (PlanningException e) {
+      LOG.error("Exception in archiving completed reservations: ", e);
+    }
+    LOG.info("Finished iteration of plan follower edit policy for plan: "
+        + planQueueName);
+
+    // Extension: update plan with app states,
+    // useful to support smart replanning
+  }
+
+  /**
+   * Move all apps in the set of queues to the parent plan queue's default
+   * reservation queue in a synchronous fashion
+   */
+  private void moveAppsInQueueSync(String expiredReservation,
+      String defReservationQueue) {
+    List<ApplicationAttemptId> activeApps =
+        scheduler.getAppsInQueue(expiredReservation);
+    if (activeApps.isEmpty()) {
+      return;
+    }
+    for (ApplicationAttemptId app : activeApps) {
+      // fallback to parent's default queue
+      try {
+        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
+      } catch (YarnException e) {
+        LOG.warn(
+            "Encountered unexpected error during migration of application: {} from reservation: {}",
+            app, expiredReservation, e);
+      }
+    }
+  }
+
+  /**
+   * First sets entitlement of queues to zero to prevent new app submission.
+   * Then move all apps in the set of queues to the parent plan queue's default
+   * reservation queue if move is enabled. Finally cleanups the queue by killing
+   * any apps (if move is disabled or move failed) and removing the queue
+   */
+  private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
+      String defReservationQueue) {
+    for (String expiredReservation : toRemove) {
+      try {
+        // reduce entitlement to 0
+        scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
+            0.0f));
+        if (shouldMove) {
+          moveAppsInQueueSync(expiredReservation, defReservationQueue);
+        }
+        if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
+          scheduler.killAllAppsInQueue(expiredReservation);
+          LOG.info("Killing applications in queue: {}", expiredReservation);
+        } else {
+          scheduler.removeQueue(expiredReservation);
+          LOG.info("Queue: " + expiredReservation + " removed");
+        }
+      } catch (YarnException e) {
+        LOG.warn("Exception while trying to expire reservation: {}",
+            expiredReservation, e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void setPlans(Collection<Plan> plans) {
+    this.plans.clear();
+    this.plans.addAll(plans);
+  }
+
+  /**
+   * Sort in the order from the least new amount of resources asked (likely
+   * negative) to the highest. This prevents "order-of-operation" errors related
+   * to exceeding 100% capacity temporarily.
+   */
+  private List<ReservationAllocation> sortByDelta(
+      List<ReservationAllocation> currentReservations, long now) {
+    Collections.sort(currentReservations, new ReservationAllocationComparator(
+        scheduler, now));
+    return currentReservations;
+  }
+
+  private class ReservationAllocationComparator implements
+      Comparator<ReservationAllocation> {
+    CapacityScheduler scheduler;
+    long now;
+
+    ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
+      this.scheduler = scheduler;
+      this.now = now;
+    }
+
+    private Resource getUnallocatedReservedResources(
+        ReservationAllocation reservation) {
+      Resource resResource;
+      CSQueue resQueue =
+          scheduler.getQueue(reservation.getReservationId().toString());
+      if (resQueue != null) {
+        resResource =
+            Resources.subtract(
+                reservation.getResourcesAtTime(now),
+                Resources.multiply(scheduler.getClusterResource(),
+                    resQueue.getAbsoluteCapacity()));
+      } else {
+        resResource = reservation.getResourcesAtTime(now);
+      }
+      return resResource;
+    }
+
+    @Override
+    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
+      // compute delta between current and previous reservation, and compare
+      // based on that
+      Resource lhsRes = getUnallocatedReservedResources(lhs);
+      Resource rhsRes = getUnallocatedReservedResources(rhs);
+      return lhsRes.compareTo(rhsRes);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
new file mode 100644
index 0000000..9d00366
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
@@ -0,0 +1,67 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+
+/**
+ * A PlanFollower is a component that runs on a timer, and synchronizes the
+ * underlying {@link ResourceScheduler} with the {@link Plan}(s) and viceversa.
+ * 
+ * While different implementations might operate differently, the key idea is to
+ * map the current allocation of resources for each active reservation in the
+ * plan(s), to a corresponding notion in the underlying scheduler (e.g., tuning
+ * capacity of queues, set pool weights, or tweak application priorities). The
+ * goal is to affect the dynamic allocation of resources done by the scheduler
+ * so that the jobs obtain access to resources in a way that is consistent with
+ * the reservations in the plan. A key conceptual step here is to convert the
+ * absolute-valued promises made in the reservations to appropriate relative
+ * priorities/queue sizes etc.
+ * 
+ * Symmetrically the PlanFollower exposes changes in cluster conditions (as
+ * tracked by the scheduler) to the plan, e.g., the overall amount of physical
+ * resources available. The Plan in turn can react by replanning its allocations
+ * if appropriate.
+ * 
+ * The implementation can assume that is run frequently enough to be able to
+ * observe and react to normal operational changes in cluster conditions on the
+ * fly (e.g., if cluster resources drop, we can update the relative weights of a
+ * queue so that the absolute promises made to the job at reservation time are
+ * respected).
+ * 
+ * However, due to RM restarts and the related downtime, it is advisable for
+ * implementations to operate in a stateless way, and be able to synchronize the
+ * state of plans/scheduler regardless of how big is the time gap between
+ * executions.
+ */
+public interface PlanFollower extends Runnable {
+
+  /**
+   * Init function that configures the PlanFollower, by providing:
+   * 
+   * @param clock a reference to the system clock.
+   * @param sched a reference to the underlying scheduler
+   * @param plans references to the plans we should keep synchronized at every
+   *          time tick.
+   */
+  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans);
+
+  /**
+   * The function performing the actual synchronization operation for a given
+   * Plan. This is normally invoked by the run method, but it can be invoked
+   * synchronously to avoid race conditions when a user's reservation request
+   * start time is imminent.
+   * 
+   * @param plan the Plan to synchronize
+   */
+  public void synchronizePlan(Plan plan);
+
+  /**
+   * Setter for the list of plans.
+   * 
+   * @param plans the collection of Plans we operate on at every time tick.
+   */
+  public void setPlans(Collection<Plan> plans);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3418c56b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
new file mode 100644
index 0000000..4eedd42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+public class TestCapacitySchedulerPlanFollower {
+
+  final static int GB = 1024;
+
+  private Clock mClock = null;
+  private CapacityScheduler scheduler = null;
+  private RMContext rmContext;
+  private RMContext spyRMContext;
+  private CapacitySchedulerContext csContext;
+  private ReservationAgent mAgent;
+  private Plan plan;
+  private Resource minAlloc = Resource.newInstance(GB, 1);
+  private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
+  private ResourceCalculator res = new DefaultResourceCalculator();
+  private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    CapacityScheduler spyCs = new CapacityScheduler();
+    scheduler = spy(spyCs);
+    rmContext = TestUtils.getMockRMContext();
+    spyRMContext = spy(rmContext);
+
+    ConcurrentMap<ApplicationId, RMApp> spyApps =
+        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+    RMApp rmApp = mock(RMApp.class);
+    when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
+        .thenReturn(null);
+    Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
+    when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(csConf);
+
+    scheduler.setConf(csConf);
+
+    csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getConf()).thenReturn(csConf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(minAlloc);
+    when(csContext.getMaximumResourceCapability()).thenReturn(maxAlloc);
+    when(csContext.getClusterResource()).thenReturn(
+        Resources.createResource(100 * 16 * GB, 100 * 32));
+    when(scheduler.getClusterResource()).thenReturn(
+        Resources.createResource(125 * GB, 125));
+    when(csContext.getResourceCalculator()).thenReturn(
+        new DefaultResourceCalculator());
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(csConf);
+    containerTokenSecretManager.rollMasterKey();
+    when(csContext.getContainerTokenSecretManager()).thenReturn(
+        containerTokenSecretManager);
+
+    scheduler.setRMContext(spyRMContext);
+    scheduler.init(csConf);
+    scheduler.start();
+
+    setupPlanFollower();
+  }
+
+  private void setupPlanFollower() throws Exception {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    mClock = mock(Clock.class);
+    mAgent = mock(ReservationAgent.class);
+
+    String reservationQ = testUtil.getFullReservationQueueName();
+    CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
+    csConf.setReservationWindow(reservationQ, 20L);
+    csConf.setMaximumCapacity(reservationQ, 40);
+    csConf.setAverageCapacity(reservationQ, 20);
+    policy.init(reservationQ, csConf);
+  }
+
+  @Test
+  public void testWithMoveOnExpiry() throws PlanningException,
+      InterruptedException, AccessControlException {
+    // invoke plan follower test with move
+    testPlanFollower(true);
+  }
+
+  @Test
+  public void testWithKillOnExpiry() throws PlanningException,
+      InterruptedException, AccessControlException {
+    // invoke plan follower test with kill
+    testPlanFollower(false);
+  }
+
+  private void testPlanFollower(boolean isMove) throws PlanningException,
+      InterruptedException, AccessControlException {
+    // Initialize plan based on move flag
+    plan =
+        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+            scheduler.getClusterResource(), 1L, res,
+            scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
+            null, isMove);
+
+    // add a few reservations to the plan
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int[] f1 = { 10, 10, 10, 10, 10 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+            "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
+            "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+
+    ReservationId r3 = ReservationId.newInstance(ts, 3);
+    int[] f2 = { 0, 10, 20, 10, 0 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
+            "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
+                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+
+    CapacitySchedulerPlanFollower planFollower =
+        new CapacitySchedulerPlanFollower();
+    planFollower.init(mClock, scheduler, Collections.singletonList(plan));
+
+    when(mClock.getTime()).thenReturn(0L);
+    planFollower.run();
+
+    CSQueue defQ =
+        scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
+    CSQueue q = scheduler.getQueue(r1.toString());
+    assertNotNull(q);
+    // submit an app to r1
+    String user_0 = "test-user";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId_0 =
+        ApplicationAttemptId.newInstance(appId, 0);
+    AppAddedSchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+    scheduler.handle(addAppEvent);
+    AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
+    scheduler.handle(appAttemptAddedEvent);
+
+    // initial default reservation queue should have no apps
+    Assert.assertEquals(0, defQ.getNumApplications());
+
+    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+    Assert.assertEquals(1, q.getNumApplications());
+
+    CSQueue q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    CSQueue q3 = scheduler.getQueue(r3.toString());
+    assertNull(q3);
+
+    when(mClock.getTime()).thenReturn(3L);
+    planFollower.run();
+
+    Assert.assertEquals(0, defQ.getNumApplications());
+    q = scheduler.getQueue(r1.toString());
+    assertNotNull(q);
+    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+    Assert.assertEquals(1, q.getNumApplications());
+    q2 = scheduler.getQueue(r2.toString());
+    assertNotNull(q2);
+    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNull(q3);
+
+    when(mClock.getTime()).thenReturn(10L);
+    planFollower.run();
+
+    q = scheduler.getQueue(r1.toString());
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, defQ.getNumApplications());
+      assertNull(q);
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, defQ.getNumApplications());
+      assertNotNull(q);
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
+              RMAppAttemptState.KILLED, false);
+      scheduler.handle(appAttemptRemovedEvent);
+    }
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNotNull(q3);
+    Assert.assertEquals(0, q3.getCapacity(), 0.01);
+    Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
+
+    when(mClock.getTime()).thenReturn(11L);
+    planFollower.run();
+
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, defQ.getNumApplications());
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, defQ.getNumApplications());
+    }
+    q = scheduler.getQueue(r1.toString());
+    assertNull(q);
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNotNull(q3);
+    Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
+
+    when(mClock.getTime()).thenReturn(12L);
+    planFollower.run();
+
+    q = scheduler.getQueue(r1.toString());
+    assertNull(q);
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNotNull(q3);
+    Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
+    Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
+
+    when(mClock.getTime()).thenReturn(16L);
+    planFollower.run();
+
+    q = scheduler.getQueue(r1.toString());
+    assertNull(q);
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNull(q3);
+
+    assertTrue(defQ.getCapacity() > 0.9);
+
+  }
+
+  public static ApplicationACLsManager mockAppACLsManager() {
+    Configuration conf = new Configuration();
+    return new ApplicationACLsManager(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.stop();
+    }
+  }
+
+}