You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:24 UTC

[09/33] git commit: YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. Contributed by Subru Krishnan and Carlo Curino. (cherry picked from commit 169085319b8b76641f8b9f6840a3fef06d221e2b) (cherry pic

YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit 169085319b8b76641f8b9f6840a3fef06d221e2b)
(cherry picked from commit 3418c56bcf4bbddaf483bdaa1a15a8bbc4039bfe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fec639cd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fec639cd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fec639cd

Branch: refs/heads/branch-2
Commit: fec639cda14f56b87052512c166c8e42a0ba6cf4
Parents: 6bfdaf0
Author: subru <su...@outlook.com>
Authored: Tue Sep 16 16:45:45 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:21:06 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |   7 +-
 .../CapacitySchedulerPlanFollower.java          | 367 +++++++++++++++++++
 .../reservation/PlanFollower.java               |  67 ++++
 .../TestCapacitySchedulerPlanFollower.java      | 319 ++++++++++++++++
 4 files changed, 758 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fec639cd/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index e9ec691..56b3c12 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -5,10 +5,10 @@ YARN-2475. Logic for responding to capacity drops for the
 ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
 
 YARN-1708. Public YARN APIs for creating/updating/deleting 
-reservations. (Carlo Curino and Subru Krishnan via subru)
+reservations. (Subru Krishnan and Carlo Curino  via subru)
 
 YARN-1709. In-memory data structures used to track resources over
-time to enable reservations. (Carlo Curino and Subru Krishnan via 
+time to enable reservations. (Subru Krishnan and Carlo Curino via 
 subru)
 
 YARN-1710. Logic to find allocations within a Plan that satisfy 
@@ -17,3 +17,6 @@ curino)
 
 YARN-1711. Policy to enforce instantaneous and over-time quotas 
 on user reservations. (Carlo Curino and Subru Krishnan via curino)
+
+YARN-1712. Plan follower that synchronizes the current state of reservation
+subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fec639cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
new file mode 100644
index 0000000..cfa172c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a {@link PlanFollower}. This is invoked on a timer, and
+ * it is in charge to publish the state of the {@link Plan}s to the underlying
+ * {@link CapacityScheduler}. This implementation does so, by
+ * adding/removing/resizing leaf queues in the scheduler, thus affecting the
+ * dynamic behavior of the scheduler in a way that is consistent with the
+ * content of the plan. It also updates the plan's view on how much resources
+ * are available in the cluster.
+ * 
+ * This implementation of PlanFollower is relatively stateless, and it can
+ * synchronize schedulers and Plans that have arbitrary changes (performing set
+ * differences among existing queues). This makes it resilient to frequency of
+ * synchronization, and RM restart issues (no "catch up" is necessary).
+ */
+public class CapacitySchedulerPlanFollower implements PlanFollower {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CapacitySchedulerPlanFollower.class);
+
+  private Collection<Plan> plans = new ArrayList<Plan>();
+
+  private Clock clock;
+  private CapacityScheduler scheduler;
+
+  @Override
+  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
+    LOG.info("Initializing Plan Follower Policy:"
+        + this.getClass().getCanonicalName());
+    if (!(sched instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException(
+          "CapacitySchedulerPlanFollower can only work with CapacityScheduler");
+    }
+    this.clock = clock;
+    this.scheduler = (CapacityScheduler) sched;
+    this.plans.addAll(plans);
+  }
+
+  @Override
+  public synchronized void run() {
+    for (Plan plan : plans) {
+      synchronizePlan(plan);
+    }
+  }
+
+  @Override
+  public synchronized void synchronizePlan(Plan plan) {
+    String planQueueName = plan.getQueueName();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
+    }
+    // align with plan step
+    long step = plan.getStep();
+    long now = clock.getTime();
+    if (now % step != 0) {
+      now += step - (now % step);
+    }
+    CSQueue queue = scheduler.getQueue(planQueueName);
+    if (!(queue instanceof PlanQueue)) {
+      LOG.error("The Plan is not an PlanQueue!");
+      return;
+    }
+    PlanQueue planQueue = (PlanQueue) queue;
+    // first we publish to the plan the current availability of resources
+    Resource clusterResources = scheduler.getClusterResource();
+    float planAbsCap = planQueue.getAbsoluteCapacity();
+    Resource planResources = Resources.multiply(clusterResources, planAbsCap);
+    plan.setTotalCapacity(planResources);
+
+    Set<ReservationAllocation> currentReservations =
+        plan.getReservationsAtTime(now);
+    Set<String> curReservationNames = new HashSet<String>();
+    Resource reservedResources = Resource.newInstance(0, 0);
+    int numRes = 0;
+    if (currentReservations != null) {
+      numRes = currentReservations.size();
+      for (ReservationAllocation reservation : currentReservations) {
+        curReservationNames.add(reservation.getReservationId().toString());
+        Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
+      }
+    }
+    // create the default reservation queue if it doesnt exist
+    String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    if (scheduler.getQueue(defReservationQueue) == null) {
+      ReservationQueue defQueue =
+          new ReservationQueue(scheduler, defReservationQueue, planQueue);
+      try {
+        scheduler.addQueue(defQueue);
+      } catch (SchedulerDynamicEditException e) {
+        LOG.warn(
+            "Exception while trying to create default reservation queue for plan: {}",
+            planQueueName, e);
+      }
+    }
+    curReservationNames.add(defReservationQueue);
+    // if the resources dedicated to this plan has shrunk invoke replanner
+    if (Resources.greaterThan(scheduler.getResourceCalculator(),
+        clusterResources, reservedResources, planResources)) {
+      try {
+        plan.getReplanner().plan(plan, null);
+      } catch (PlanningException e) {
+        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+      }
+    }
+    // identify the reservations that have expired and new reservations that
+    // have to be activated
+    List<CSQueue> resQueues = planQueue.getChildQueues();
+    Set<String> expired = new HashSet<String>();
+    for (CSQueue resQueue : resQueues) {
+      String resQueueName = resQueue.getQueueName();
+      if (curReservationNames.contains(resQueueName)) {
+        // it is already existing reservation, so needed not create new
+        // reservation queue
+        curReservationNames.remove(resQueueName);
+      } else {
+        // the reservation has termination, mark for cleanup
+        expired.add(resQueueName);
+      }
+    }
+    // garbage collect expired reservations
+    cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
+
+    // Add new reservations and update existing ones
+    float totalAssignedCapacity = 0f;
+    if (currentReservations != null) {
+      // first release all excess capacity in default queue
+      try {
+        scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
+            1.0f));
+      } catch (YarnException e) {
+        LOG.warn(
+            "Exception while trying to release default queue capacity for plan: {}",
+            planQueueName, e);
+      }
+      // sort allocations from the one giving up the most resources, to the
+      // one asking for the most
+      // avoid order-of-operation errors that temporarily violate 100%
+      // capacity bound
+      List<ReservationAllocation> sortedAllocations =
+          sortByDelta(
+              new ArrayList<ReservationAllocation>(currentReservations), now);
+      for (ReservationAllocation res : sortedAllocations) {
+        String currResId = res.getReservationId().toString();
+        if (curReservationNames.contains(currResId)) {
+          ReservationQueue resQueue =
+              new ReservationQueue(scheduler, currResId, planQueue);
+          try {
+            scheduler.addQueue(resQueue);
+          } catch (SchedulerDynamicEditException e) {
+            LOG.warn(
+                "Exception while trying to activate reservation: {} for plan: {}",
+                currResId, planQueueName, e);
+          }
+        }
+        Resource capToAssign = res.getResourcesAtTime(now);
+        float targetCapacity = 0f;
+        if (planResources.getMemory() > 0
+            && planResources.getVirtualCores() > 0) {
+          targetCapacity =
+              Resources.divide(scheduler.getResourceCalculator(),
+                  clusterResources, capToAssign, planResources);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Assigning capacity of {} to queue {} with target capacity {}",
+              capToAssign, currResId, targetCapacity);
+        }
+        // set maxCapacity to 100% unless the job requires gang, in which
+        // case we stick to capacity (as running early/before is likely a
+        // waste of resources)
+        float maxCapacity = 1.0f;
+        if (res.containsGangs()) {
+          maxCapacity = targetCapacity;
+        }
+        try {
+          scheduler.setEntitlement(currResId, new QueueEntitlement(
+              targetCapacity, maxCapacity));
+        } catch (YarnException e) {
+          LOG.warn("Exception while trying to size reservation for plan: {}",
+              currResId, planQueueName, e);
+        }
+        totalAssignedCapacity += targetCapacity;
+      }
+    }
+    // compute the default queue capacity
+    float defQCap = 1.0f - totalAssignedCapacity;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+          + "currReservation: {} default-queue capacity: {}", planResources,
+          numRes, defQCap);
+    }
+    // set the default queue to eat-up all remaining capacity
+    try {
+      scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
+          defQCap, 1.0f));
+    } catch (YarnException e) {
+      LOG.warn(
+          "Exception while trying to reclaim default queue capacity for plan: {}",
+          planQueueName, e);
+    }
+    // garbage collect finished reservations from plan
+    try {
+      plan.archiveCompletedReservations(now);
+    } catch (PlanningException e) {
+      LOG.error("Exception in archiving completed reservations: ", e);
+    }
+    LOG.info("Finished iteration of plan follower edit policy for plan: "
+        + planQueueName);
+
+    // Extension: update plan with app states,
+    // useful to support smart replanning
+  }
+
+  /**
+   * Move all apps in the set of queues to the parent plan queue's default
+   * reservation queue in a synchronous fashion
+   */
+  private void moveAppsInQueueSync(String expiredReservation,
+      String defReservationQueue) {
+    List<ApplicationAttemptId> activeApps =
+        scheduler.getAppsInQueue(expiredReservation);
+    if (activeApps.isEmpty()) {
+      return;
+    }
+    for (ApplicationAttemptId app : activeApps) {
+      // fallback to parent's default queue
+      try {
+        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
+      } catch (YarnException e) {
+        LOG.warn(
+            "Encountered unexpected error during migration of application: {} from reservation: {}",
+            app, expiredReservation, e);
+      }
+    }
+  }
+
+  /**
+   * First sets entitlement of queues to zero to prevent new app submission.
+   * Then move all apps in the set of queues to the parent plan queue's default
+   * reservation queue if move is enabled. Finally cleanups the queue by killing
+   * any apps (if move is disabled or move failed) and removing the queue
+   */
+  private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
+      String defReservationQueue) {
+    for (String expiredReservation : toRemove) {
+      try {
+        // reduce entitlement to 0
+        scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
+            0.0f));
+        if (shouldMove) {
+          moveAppsInQueueSync(expiredReservation, defReservationQueue);
+        }
+        if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
+          scheduler.killAllAppsInQueue(expiredReservation);
+          LOG.info("Killing applications in queue: {}", expiredReservation);
+        } else {
+          scheduler.removeQueue(expiredReservation);
+          LOG.info("Queue: " + expiredReservation + " removed");
+        }
+      } catch (YarnException e) {
+        LOG.warn("Exception while trying to expire reservation: {}",
+            expiredReservation, e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void setPlans(Collection<Plan> plans) {
+    this.plans.clear();
+    this.plans.addAll(plans);
+  }
+
+  /**
+   * Sort in the order from the least new amount of resources asked (likely
+   * negative) to the highest. This prevents "order-of-operation" errors related
+   * to exceeding 100% capacity temporarily.
+   */
+  private List<ReservationAllocation> sortByDelta(
+      List<ReservationAllocation> currentReservations, long now) {
+    Collections.sort(currentReservations, new ReservationAllocationComparator(
+        scheduler, now));
+    return currentReservations;
+  }
+
+  private class ReservationAllocationComparator implements
+      Comparator<ReservationAllocation> {
+    CapacityScheduler scheduler;
+    long now;
+
+    ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
+      this.scheduler = scheduler;
+      this.now = now;
+    }
+
+    private Resource getUnallocatedReservedResources(
+        ReservationAllocation reservation) {
+      Resource resResource;
+      CSQueue resQueue =
+          scheduler.getQueue(reservation.getReservationId().toString());
+      if (resQueue != null) {
+        resResource =
+            Resources.subtract(
+                reservation.getResourcesAtTime(now),
+                Resources.multiply(scheduler.getClusterResource(),
+                    resQueue.getAbsoluteCapacity()));
+      } else {
+        resResource = reservation.getResourcesAtTime(now);
+      }
+      return resResource;
+    }
+
+    @Override
+    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
+      // compute delta between current and previous reservation, and compare
+      // based on that
+      Resource lhsRes = getUnallocatedReservedResources(lhs);
+      Resource rhsRes = getUnallocatedReservedResources(rhs);
+      return lhsRes.compareTo(rhsRes);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fec639cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
new file mode 100644
index 0000000..9d00366
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
@@ -0,0 +1,67 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+
+/**
+ * A PlanFollower is a component that runs on a timer, and synchronizes the
+ * underlying {@link ResourceScheduler} with the {@link Plan}(s) and viceversa.
+ * 
+ * While different implementations might operate differently, the key idea is to
+ * map the current allocation of resources for each active reservation in the
+ * plan(s), to a corresponding notion in the underlying scheduler (e.g., tuning
+ * capacity of queues, set pool weights, or tweak application priorities). The
+ * goal is to affect the dynamic allocation of resources done by the scheduler
+ * so that the jobs obtain access to resources in a way that is consistent with
+ * the reservations in the plan. A key conceptual step here is to convert the
+ * absolute-valued promises made in the reservations to appropriate relative
+ * priorities/queue sizes etc.
+ * 
+ * Symmetrically the PlanFollower exposes changes in cluster conditions (as
+ * tracked by the scheduler) to the plan, e.g., the overall amount of physical
+ * resources available. The Plan in turn can react by replanning its allocations
+ * if appropriate.
+ * 
+ * The implementation can assume that is run frequently enough to be able to
+ * observe and react to normal operational changes in cluster conditions on the
+ * fly (e.g., if cluster resources drop, we can update the relative weights of a
+ * queue so that the absolute promises made to the job at reservation time are
+ * respected).
+ * 
+ * However, due to RM restarts and the related downtime, it is advisable for
+ * implementations to operate in a stateless way, and be able to synchronize the
+ * state of plans/scheduler regardless of how big is the time gap between
+ * executions.
+ */
+public interface PlanFollower extends Runnable {
+
+  /**
+   * Init function that configures the PlanFollower, by providing:
+   * 
+   * @param clock a reference to the system clock.
+   * @param sched a reference to the underlying scheduler
+   * @param plans references to the plans we should keep synchronized at every
+   *          time tick.
+   */
+  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans);
+
+  /**
+   * The function performing the actual synchronization operation for a given
+   * Plan. This is normally invoked by the run method, but it can be invoked
+   * synchronously to avoid race conditions when a user's reservation request
+   * start time is imminent.
+   * 
+   * @param plan the Plan to synchronize
+   */
+  public void synchronizePlan(Plan plan);
+
+  /**
+   * Setter for the list of plans.
+   * 
+   * @param plans the collection of Plans we operate on at every time tick.
+   */
+  public void setPlans(Collection<Plan> plans);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fec639cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
new file mode 100644
index 0000000..4eedd42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+public class TestCapacitySchedulerPlanFollower {
+
+  final static int GB = 1024;
+
+  private Clock mClock = null;
+  private CapacityScheduler scheduler = null;
+  private RMContext rmContext;
+  private RMContext spyRMContext;
+  private CapacitySchedulerContext csContext;
+  private ReservationAgent mAgent;
+  private Plan plan;
+  private Resource minAlloc = Resource.newInstance(GB, 1);
+  private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
+  private ResourceCalculator res = new DefaultResourceCalculator();
+  private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    CapacityScheduler spyCs = new CapacityScheduler();
+    scheduler = spy(spyCs);
+    rmContext = TestUtils.getMockRMContext();
+    spyRMContext = spy(rmContext);
+
+    ConcurrentMap<ApplicationId, RMApp> spyApps =
+        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+    RMApp rmApp = mock(RMApp.class);
+    when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
+        .thenReturn(null);
+    Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
+    when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(csConf);
+
+    scheduler.setConf(csConf);
+
+    csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getConf()).thenReturn(csConf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(minAlloc);
+    when(csContext.getMaximumResourceCapability()).thenReturn(maxAlloc);
+    when(csContext.getClusterResource()).thenReturn(
+        Resources.createResource(100 * 16 * GB, 100 * 32));
+    when(scheduler.getClusterResource()).thenReturn(
+        Resources.createResource(125 * GB, 125));
+    when(csContext.getResourceCalculator()).thenReturn(
+        new DefaultResourceCalculator());
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(csConf);
+    containerTokenSecretManager.rollMasterKey();
+    when(csContext.getContainerTokenSecretManager()).thenReturn(
+        containerTokenSecretManager);
+
+    scheduler.setRMContext(spyRMContext);
+    scheduler.init(csConf);
+    scheduler.start();
+
+    setupPlanFollower();
+  }
+
+  private void setupPlanFollower() throws Exception {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    mClock = mock(Clock.class);
+    mAgent = mock(ReservationAgent.class);
+
+    String reservationQ = testUtil.getFullReservationQueueName();
+    CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
+    csConf.setReservationWindow(reservationQ, 20L);
+    csConf.setMaximumCapacity(reservationQ, 40);
+    csConf.setAverageCapacity(reservationQ, 20);
+    policy.init(reservationQ, csConf);
+  }
+
+  @Test
+  public void testWithMoveOnExpiry() throws PlanningException,
+      InterruptedException, AccessControlException {
+    // invoke plan follower test with move
+    testPlanFollower(true);
+  }
+
+  @Test
+  public void testWithKillOnExpiry() throws PlanningException,
+      InterruptedException, AccessControlException {
+    // invoke plan follower test with kill
+    testPlanFollower(false);
+  }
+
+  private void testPlanFollower(boolean isMove) throws PlanningException,
+      InterruptedException, AccessControlException {
+    // Initialize plan based on move flag
+    plan =
+        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+            scheduler.getClusterResource(), 1L, res,
+            scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
+            null, isMove);
+
+    // add a few reservations to the plan
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int[] f1 = { 10, 10, 10, 10, 10 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+            "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
+            "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+
+    ReservationId r3 = ReservationId.newInstance(ts, 3);
+    int[] f2 = { 0, 10, 20, 10, 0 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
+            "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
+                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+
+    CapacitySchedulerPlanFollower planFollower =
+        new CapacitySchedulerPlanFollower();
+    planFollower.init(mClock, scheduler, Collections.singletonList(plan));
+
+    when(mClock.getTime()).thenReturn(0L);
+    planFollower.run();
+
+    CSQueue defQ =
+        scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
+    CSQueue q = scheduler.getQueue(r1.toString());
+    assertNotNull(q);
+    // submit an app to r1
+    String user_0 = "test-user";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId_0 =
+        ApplicationAttemptId.newInstance(appId, 0);
+    AppAddedSchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+    scheduler.handle(addAppEvent);
+    AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
+    scheduler.handle(appAttemptAddedEvent);
+
+    // initial default reservation queue should have no apps
+    Assert.assertEquals(0, defQ.getNumApplications());
+
+    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+    Assert.assertEquals(1, q.getNumApplications());
+
+    CSQueue q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    CSQueue q3 = scheduler.getQueue(r3.toString());
+    assertNull(q3);
+
+    when(mClock.getTime()).thenReturn(3L);
+    planFollower.run();
+
+    Assert.assertEquals(0, defQ.getNumApplications());
+    q = scheduler.getQueue(r1.toString());
+    assertNotNull(q);
+    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+    Assert.assertEquals(1, q.getNumApplications());
+    q2 = scheduler.getQueue(r2.toString());
+    assertNotNull(q2);
+    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNull(q3);
+
+    when(mClock.getTime()).thenReturn(10L);
+    planFollower.run();
+
+    q = scheduler.getQueue(r1.toString());
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, defQ.getNumApplications());
+      assertNull(q);
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, defQ.getNumApplications());
+      assertNotNull(q);
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
+              RMAppAttemptState.KILLED, false);
+      scheduler.handle(appAttemptRemovedEvent);
+    }
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNotNull(q3);
+    Assert.assertEquals(0, q3.getCapacity(), 0.01);
+    Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
+
+    when(mClock.getTime()).thenReturn(11L);
+    planFollower.run();
+
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, defQ.getNumApplications());
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, defQ.getNumApplications());
+    }
+    q = scheduler.getQueue(r1.toString());
+    assertNull(q);
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNotNull(q3);
+    Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
+    Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
+
+    when(mClock.getTime()).thenReturn(12L);
+    planFollower.run();
+
+    q = scheduler.getQueue(r1.toString());
+    assertNull(q);
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNotNull(q3);
+    Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
+    Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
+
+    when(mClock.getTime()).thenReturn(16L);
+    planFollower.run();
+
+    q = scheduler.getQueue(r1.toString());
+    assertNull(q);
+    q2 = scheduler.getQueue(r2.toString());
+    assertNull(q2);
+    q3 = scheduler.getQueue(r3.toString());
+    assertNull(q3);
+
+    assertTrue(defQ.getCapacity() > 0.9);
+
+  }
+
+  public static ApplicationACLsManager mockAppACLsManager() {
+    Configuration conf = new Configuration();
+    return new ApplicationACLsManager(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.stop();
+    }
+  }
+
+}