You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:39 UTC

[24/33] git commit: YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino. (cherry picked from commit 8baeaead8532898163f1006276b731a237b1a559)

YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit 8baeaead8532898163f1006276b731a237b1a559)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
(cherry picked from commit 6261f7cc69a0eb3eebc9898c7599c7c20f432b4e)
(cherry picked from commit cbfbdf60d69e70c1821d5b3d343a4f5c0c2a410f)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d244b2ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d244b2ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d244b2ae

Branch: refs/heads/branch-2.6
Commit: d244b2ae3ae9f2f30fdb319dbd8e4cc0ae67fda5
Parents: 8e92b3e
Author: subru <su...@outlook.com>
Authored: Thu Sep 18 15:30:27 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:29:13 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  19 +
 .../hadoop/yarn/client/api/YarnClient.java      | 102 ++++
 .../yarn/client/api/impl/YarnClientImpl.java    |  25 +
 .../yarn/client/api/impl/TestYarnClient.java    | 115 ++++
 .../server/resourcemanager/AdminService.java    |   6 +
 .../server/resourcemanager/ClientRMService.java | 201 +++++++
 .../server/resourcemanager/RMAuditLogger.java   |   5 +
 .../yarn/server/resourcemanager/RMContext.java  |   2 +
 .../server/resourcemanager/RMContextImpl.java   |  11 +
 .../server/resourcemanager/ResourceManager.java |  38 ++
 .../reservation/AbstractReservationSystem.java  | 323 +++++++++++
 .../reservation/CapacityReservationSystem.java  | 146 +++++
 .../reservation/ReservationInputValidator.java  | 244 ++++++++
 .../reservation/ReservationSystem.java          | 125 +++++
 .../server/resourcemanager/rmapp/RMApp.java     |   3 +
 .../server/resourcemanager/rmapp/RMAppImpl.java |   5 +
 .../scheduler/event/AppAddedSchedulerEvent.java |  16 +-
 .../resourcemanager/TestClientRMService.java    | 116 ++++
 .../TestCapacityReservationSystem.java          | 102 ++++
 .../TestReservationInputValidator.java          | 560 +++++++++++++++++++
 21 files changed, 2165 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index 56b3c12..c4106b2 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -20,3 +20,6 @@ on user reservations. (Carlo Curino and Subru Krishnan via curino)
 
 YARN-1712. Plan follower that synchronizes the current state of reservation
 subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)
+
+YARN-2080. Integrating reservation system with ResourceManager and 
+client-RM protocol. (Subru Krishnan and Carlo Curino  via subru)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5fd83eb..6574f51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -176,6 +176,25 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = 
       false;
 
+  /** Whether the RM should enable Reservation System */
+  public static final String RM_RESERVATION_SYSTEM_ENABLE = RM_PREFIX
+      + "reservation-system.enable";
+  public static final boolean DEFAULT_RM_RESERVATION_SYSTEM_ENABLE = false;
+
+  /** The class to use as the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_CLASS = RM_PREFIX
+      + "reservation-system.class";
+
+  /** The PlanFollower for the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER = RM_PREFIX
+      + "reservation-system.plan.follower";
+
+  /** The step size of the Reservation System. */
+  public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
+      RM_PREFIX + "reservation-system.planfollower.time-step";
+  public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
+      1000L;
+
   /**
    * Enable periodic monitor threads.
    * @see #RM_SCHEDULER_MONITOR_POLICIES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index 9e27de5..d697de9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -27,10 +27,17 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -43,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -474,4 +482,98 @@ public abstract class YarnClient extends AbstractService {
    */
   public abstract void moveApplicationAcrossQueues(ApplicationId appId,
       String queue) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to submit a new reservation to the
+   * {@link ResourceManager}.
+   * </p>
+   * 
+   * <p>
+   * The client packages all details of its request in a
+   * {@link ReservationRequest} object. This contains information about the
+   * amount of capacity, temporal constraints, and gang needs. Furthermore, the
+   * reservation might be composed of multiple stages, with ordering
+   * dependencies among them.
+   * </p>
+   * 
+   * <p>
+   * In order to respond, a new admission control component in the
+   * {@link ResourceManager} performs an analysis of the resources that have
+   * been committed over the period of time the user is requesting, verify that
+   * the user requests can be fulfilled, and that it respect a sharing policy
+   * (e.g., {@link CapacityOverTimePolicy}). Once it has positively determined
+   * that the ReservationRequest is satisfiable the {@link ResourceManager}
+   * answers with a {@link ReservationResponse} that include a
+   * {@link ReservationId}. Upon failure to find a valid allocation the response
+   * is an exception with the message detailing the reason of failure.
+   * </p>
+   * 
+   * <p>
+   * The semantics guarantees that the ReservationId returned, corresponds to a
+   * valid reservation existing in the time-range request by the user. The
+   * amount of capacity dedicated to such reservation can vary overtime,
+   * depending of the allocation that has been determined. But it is guaranteed
+   * to satisfy all the constraint expressed by the user in the
+   * {@link ReservationRequest}
+   * </p>
+   * 
+   * @param request request to submit a new Reservation
+   * @return response contains the {@link ReservationId} on accepting the
+   *         submission
+   * @throws YarnException if the reservation cannot be created successfully
+   * @throws IOException
+   * 
+   */
+  @Public
+  @Unstable
+  public abstract ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to update an existing Reservation. This is
+   * referred to as a re-negotiation process, in which a user that has
+   * previously submitted a Reservation.
+   * </p>
+   * 
+   * <p>
+   * The allocation is attempted by virtually substituting all previous
+   * allocations related to this Reservation with new ones, that satisfy the new
+   * {@link ReservationRequest}. Upon success the previous allocation is
+   * atomically substituted by the new one, and on failure (i.e., if the system
+   * cannot find a valid allocation for the updated request), the previous
+   * allocation remains valid.
+   * </p>
+   * 
+   * @param request to update an existing Reservation (the ReservationRequest
+   *          should refer to an existing valid {@link ReservationId})
+   * @return response empty on successfully updating the existing reservation
+   * @throws YarnException if the request is invalid or reservation cannot be
+   *           updated successfully
+   * @throws IOException
+   * 
+   */
+  @Public
+  @Unstable
+  public abstract ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The interface used by clients to remove an existing Reservation.
+   * </p>
+   * 
+   * @param request to remove an existing Reservation (the ReservationRequest
+   *          should refer to an existing valid {@link ReservationId})
+   * @return response empty on successfully deleting the existing reservation
+   * @throws YarnException if the request is invalid or reservation cannot be
+   *           deleted successfully
+   * @throws IOException
+   * 
+   */
+  @Public
+  @Unstable
+  public abstract ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index def6da5..02c5a74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -63,6 +63,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -646,4 +652,23 @@ public class YarnClientImpl extends YarnClient {
         MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
     rmClient.moveApplicationAcrossQueues(request);
   }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    return rmClient.submitReservation(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    return rmClient.updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    return rmClient.deleteReservation(request);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 3c1b1c1..d7bea7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,6 +64,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -76,6 +83,11 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -89,8 +101,14 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -834,4 +852,101 @@ public class TestYarnClient {
       client.stop();
     }
   }
+  
+  @Test
+  public void testReservationAPIs() {
+    // initialize
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    MiniYARNCluster cluster =
+        new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
+    YarnClient client = null;
+    try {
+      cluster.init(conf);
+      cluster.start();
+      final Configuration yarnConf = cluster.getConfig();
+      client = YarnClient.createYarnClient();
+      client.init(yarnConf);
+      client.start();
+
+      // create a reservation
+      Clock clock = new UTCClock();
+      long arrival = clock.getTime();
+      long duration = 60000;
+      long deadline = (long) (arrival + 1.05 * duration);
+      ReservationSubmissionRequest sRequest =
+          createSimpleReservationRequest(4, arrival, deadline, duration);
+      ReservationSubmissionResponse sResponse = null;
+      try {
+        sResponse = client.submitReservation(sRequest);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertNotNull(sResponse);
+      ReservationId reservationID = sResponse.getReservationId();
+      Assert.assertNotNull(reservationID);
+      System.out.println("Submit reservation response: " + reservationID);
+
+      // Update the reservation
+      ReservationDefinition rDef = sRequest.getReservationDefinition();
+      ReservationRequest rr =
+          rDef.getReservationRequests().getReservationResources().get(0);
+      rr.setNumContainers(5);
+      arrival = clock.getTime();
+      duration = 30000;
+      deadline = (long) (arrival + 1.05 * duration);
+      rr.setDuration(duration);
+      rDef.setArrival(arrival);
+      rDef.setDeadline(deadline);
+      ReservationUpdateRequest uRequest =
+          ReservationUpdateRequest.newInstance(rDef, reservationID);
+      ReservationUpdateResponse uResponse = null;
+      try {
+        uResponse = client.updateReservation(uRequest);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertNotNull(sResponse);
+      System.out.println("Update reservation response: " + uResponse);
+
+      // Delete the reservation
+      ReservationDeleteRequest dRequest =
+          ReservationDeleteRequest.newInstance(reservationID);
+      ReservationDeleteResponse dResponse = null;
+      try {
+        dResponse = client.deleteReservation(dRequest);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertNotNull(sResponse);
+      System.out.println("Delete reservation response: " + dResponse);
+    } finally {
+      // clean-up
+      if (client != null) {
+        client.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            numContainers, 1, duration);
+    ReservationRequests reqs =
+        ReservationRequests.newInstance(Collections.singletonList(r),
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef =
+        ReservationDefinition.newInstance(arrival, deadline, reqs,
+            "testYarnClient#reservation");
+    ReservationSubmissionRequest request =
+        ReservationSubmissionRequest.newInstance(rDef,
+            ReservationSystemTestUtil.reservationQ);
+    return request;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index ff0a249..2b7797f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -348,6 +349,11 @@ public class AdminService extends CompositeService implements
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
     try {
       rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+      // refresh the reservation system
+      ReservationSystem rSystem = rmContext.getReservationSystem();
+      if (rSystem != null) {
+        rSystem.reinitialize(getConfig(), rmContext);
+      }
       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
           "AdminService");
       return response;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index ed251e7..2b4196f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.AccessControlException;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -79,6 +80,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -93,6 +100,8 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -107,6 +116,10 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -123,7 +136,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.UTCClock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
@@ -153,10 +168,23 @@ public class ClientRMService extends AbstractService implements
   private final ApplicationACLsManager applicationsACLsManager;
   private final QueueACLsManager queueACLsManager;
 
+  // For Reservation APIs
+  private Clock clock;
+  private ReservationSystem reservationSystem;
+  private ReservationInputValidator rValidator;
+
   public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
       RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
       QueueACLsManager queueACLsManager,
       RMDelegationTokenSecretManager rmDTSecretManager) {
+    this(rmContext, scheduler, rmAppManager, applicationACLsManager,
+        queueACLsManager, rmDTSecretManager, new UTCClock());
+  }
+
+  public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
+      RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
+      QueueACLsManager queueACLsManager,
+      RMDelegationTokenSecretManager rmDTSecretManager, Clock clock) {
     super(ClientRMService.class.getName());
     this.scheduler = scheduler;
     this.rmContext = rmContext;
@@ -164,6 +192,9 @@ public class ClientRMService extends AbstractService implements
     this.applicationsACLsManager = applicationACLsManager;
     this.queueACLsManager = queueACLsManager;
     this.rmDTSecretManager = rmDTSecretManager;
+    this.reservationSystem = rmContext.getReservationSystem();
+    this.clock = clock;
+    this.rValidator = new ReservationInputValidator(clock);
   }
 
   @Override
@@ -1032,4 +1063,174 @@ public class ClientRMService extends AbstractService implements
   public Server getServer() {
     return this.server;
   }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    // Check if reservation system is enabled
+    checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
+    ReservationSubmissionResponse response =
+        recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
+    // Create a new Reservation Id
+    ReservationId reservationId = reservationSystem.getNewReservationId();
+    // Validate the input
+    Plan plan =
+        rValidator.validateReservationSubmissionRequest(reservationSystem,
+            request, reservationId);
+    // Check ACLs
+    String queueName = request.getQueue();
+    String user =
+        checkReservationACLs(queueName,
+            AuditConstants.SUBMIT_RESERVATION_REQUEST);
+    try {
+      // Try to place the reservation using the agent
+      boolean result =
+          plan.getReservationAgent().createReservation(reservationId, user,
+              plan, request.getReservationDefinition());
+      if (result) {
+        // add the reservation id to valid ones maintained by reservation
+        // system
+        reservationSystem.setQueueForReservation(reservationId, queueName);
+        // create the reservation synchronously if required
+        refreshScheduler(queueName, request.getReservationDefinition(),
+            reservationId.toString());
+        // return the reservation id
+        response.setReservationId(reservationId);
+      }
+    } catch (PlanningException e) {
+      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Unable to create the reservation: " + reservationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
+        "ClientRMService: " + reservationId);
+    return response;
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    // Check if reservation system is enabled
+    checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
+    ReservationUpdateResponse response =
+        recordFactory.newRecordInstance(ReservationUpdateResponse.class);
+    // Validate the input
+    Plan plan =
+        rValidator.validateReservationUpdateRequest(reservationSystem, request);
+    ReservationId reservationId = request.getReservationId();
+    String queueName = reservationSystem.getQueueForReservation(reservationId);
+    // Check ACLs
+    String user =
+        checkReservationACLs(queueName,
+            AuditConstants.UPDATE_RESERVATION_REQUEST);
+    // Try to update the reservation using default agent
+    try {
+      boolean result =
+          plan.getReservationAgent().updateReservation(reservationId, user,
+              plan, request.getReservationDefinition());
+      if (!result) {
+        String errMsg = "Unable to update reservation: " + reservationId;
+        RMAuditLogger.logFailure(user,
+            AuditConstants.UPDATE_RESERVATION_REQUEST, errMsg,
+            "ClientRMService", errMsg);
+        throw RPCUtil.getRemoteException(errMsg);
+      }
+    } catch (PlanningException e) {
+      RMAuditLogger.logFailure(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Unable to update the reservation: " + reservationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+    RMAuditLogger.logSuccess(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
+        "ClientRMService: " + reservationId);
+    return response;
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    // Check if reservation system is enabled
+    checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
+    ReservationDeleteResponse response =
+        recordFactory.newRecordInstance(ReservationDeleteResponse.class);
+    // Validate the input
+    Plan plan =
+        rValidator.validateReservationDeleteRequest(reservationSystem, request);
+    ReservationId reservationId = request.getReservationId();
+    String queueName = reservationSystem.getQueueForReservation(reservationId);
+    // Check ACLs
+    String user =
+        checkReservationACLs(queueName,
+            AuditConstants.DELETE_RESERVATION_REQUEST);
+    // Try to update the reservation using default agent
+    try {
+      boolean result =
+          plan.getReservationAgent().deleteReservation(reservationId, user,
+              plan);
+      if (!result) {
+        String errMsg = "Could not delete reservation: " + reservationId;
+        RMAuditLogger.logFailure(user,
+            AuditConstants.DELETE_RESERVATION_REQUEST, errMsg,
+            "ClientRMService", errMsg);
+        throw RPCUtil.getRemoteException(errMsg);
+      }
+    } catch (PlanningException e) {
+      RMAuditLogger.logFailure(user, AuditConstants.DELETE_RESERVATION_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Unable to delete the reservation: " + reservationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+    RMAuditLogger.logSuccess(user, AuditConstants.DELETE_RESERVATION_REQUEST,
+        "ClientRMService: " + reservationId);
+    return response;
+  }
+
+  private void checkReservationSytem(String auditConstant) throws YarnException {
+    // Check if reservation is enabled
+    if (reservationSystem == null) {
+      throw RPCUtil.getRemoteException("Reservation is not enabled."
+          + " Please enable & try again");
+    }
+  }
+
+  private void refreshScheduler(String planName,
+      ReservationDefinition contract, String reservationId) {
+    if ((contract.getArrival() - clock.getTime()) < reservationSystem
+        .getPlanFollowerTimeStep()) {
+      LOG.debug(MessageFormat
+          .format(
+              "Reservation {0} is within threshold so attempting to create synchronously.",
+              reservationId));
+      reservationSystem.synchronizePlan(planName);
+      LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
+          reservationId));
+    }
+  }
+
+  private String checkReservationACLs(String queueName, String auditConstant)
+      throws YarnException {
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie) {
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant, queueName,
+          "ClientRMService", "Error getting UGI");
+      throw RPCUtil.getRemoteException(ie);
+    }
+    // Check if user has access on the managed queue
+    if (!queueACLsManager.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS,
+        queueName)) {
+      RMAuditLogger.logFailure(
+          callerUGI.getShortUserName(),
+          auditConstant,
+          "User doesn't have permissions to "
+              + QueueACL.SUBMIT_APPLICATIONS.toString(), "ClientRMService",
+          AuditConstants.UNAUTHORIZED_USER);
+      throw RPCUtil.getRemoteException(new AccessControlException("User "
+          + callerUGI.getShortUserName() + " cannot perform operation "
+          + QueueACL.SUBMIT_APPLICATIONS.name() + " on queue" + queueName));
+    }
+    return callerUGI.getShortUserName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index 9ae09a4..6dd67c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -56,6 +56,11 @@ public class RMAuditLogger {
 
     // Some commonly used descriptions
     public static final String UNAUTHORIZED_USER = "Unauthorized user";
+    
+    // For Reservation system
+    public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request";
+    public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
+    public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 60f88f6..46ecfcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -109,5 +109,7 @@ public interface RMContext {
   
   long getEpoch();
 
+  ReservationSystem getReservationSystem();
+
   boolean isSchedulerReadyForAllocatingContainers();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 36eec04..78787ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -83,6 +84,7 @@ public class RMContextImpl implements RMContext {
   private ClientRMService clientRMService;
   private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
   private ResourceScheduler scheduler;
+  private ReservationSystem reservationSystem;
   private NodesListManager nodesListManager;
   private ResourceTrackerService resourceTrackerService;
   private ApplicationMasterService applicationMasterService;
@@ -209,6 +211,11 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
+  public ReservationSystem getReservationSystem() {
+    return this.reservationSystem;
+  }
+  
+  @Override
   public NodesListManager getNodesListManager() {
     return this.nodesListManager;
   }
@@ -303,6 +310,10 @@ public class RMContextImpl implements RMContext {
   void setScheduler(ResourceScheduler scheduler) {
     this.scheduler = scheduler;
   }
+  
+  void setReservationSystem(ReservationSystem reservationSystem) {
+    this.reservationSystem = reservationSystem;
+  }
 
   void setDelegationTokenRenewer(
       DelegationTokenRenewer delegationTokenRenewer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 79af7a6..3e5f138 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -147,6 +149,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   protected RMSecretManagerService rmSecretManagerService;
 
   protected ResourceScheduler scheduler;
+  protected ReservationSystem reservationSystem;
   private ClientRMService clientRM;
   protected ApplicationMasterService masterService;
   protected NMLivelinessMonitor nmLivelinessMonitor;
@@ -281,6 +284,29 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
   }
 
+  protected ReservationSystem createReservationSystem() {
+    String reservationClassName =
+        conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS,
+            AbstractReservationSystem.getDefaultReservationSystem(scheduler));
+    if (reservationClassName == null) {
+      return null;
+    }
+    LOG.info("Using ReservationSystem: " + reservationClassName);
+    try {
+      Class<?> reservationClazz = Class.forName(reservationClassName);
+      if (ReservationSystem.class.isAssignableFrom(reservationClazz)) {
+        return (ReservationSystem) ReflectionUtils.newInstance(
+            reservationClazz, this.conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + reservationClassName
+            + " not instance of " + ReservationSystem.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate ReservationSystem: " + reservationClassName, e);
+    }
+  }
+
   protected ApplicationMasterLauncher createAMLauncher() {
     return new ApplicationMasterLauncher(this.rmContext);
   }
@@ -456,6 +482,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
       DefaultMetricsSystem.initialize("ResourceManager");
       JvmMetrics.initSingleton("ResourceManager", null);
 
+      // Initialize the Reservation system
+      if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
+          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
+        reservationSystem = createReservationSystem();
+        if (reservationSystem != null) {
+          reservationSystem.setRMContext(rmContext);
+          addIfService(reservationSystem);
+          rmContext.setReservationSystem(reservationSystem);
+          LOG.info("Initialized Reservation system");
+        }
+      }
+
       // creating monitors that handle preemption
       createPolicyMonitors();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
new file mode 100644
index 0000000..f0a9543
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -0,0 +1,323 @@
+/**
+ *
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the implementation of {@link ReservationSystem} based on the
+ * {@link ResourceScheduler}
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public abstract class AbstractReservationSystem extends AbstractService
+    implements ReservationSystem {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AbstractReservationSystem.class);
+
+  // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
+
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock(true);
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private boolean initialized = false;
+
+  private final Clock clock = new UTCClock();
+
+  private AtomicLong resCounter = new AtomicLong();
+
+  private Map<String, Plan> plans = new HashMap<String, Plan>();
+
+  private Map<ReservationId, String> resQMap =
+      new HashMap<ReservationId, String>();
+
+  private RMContext rmContext;
+
+  private ResourceScheduler scheduler;
+
+  private ScheduledExecutorService scheduledExecutorService;
+
+  protected Configuration conf;
+
+  protected long planStepSize;
+
+  private PlanFollower planFollower;
+
+  /**
+   * Construct the service.
+   * 
+   * @param name service name
+   */
+  public AbstractReservationSystem(String name) {
+    super(name);
+  }
+
+  @Override
+  public void setRMContext(RMContext rmContext) {
+    writeLock.lock();
+    try {
+      this.rmContext = rmContext;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void reinitialize(Configuration conf, RMContext rmContext)
+      throws YarnException {
+    writeLock.lock();
+    try {
+      if (!initialized) {
+        initialize(conf);
+        initialized = true;
+      } else {
+        initializeNewPlans(conf);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void initialize(Configuration conf) throws YarnException {
+    LOG.info("Initializing Reservation system");
+    this.conf = conf;
+    scheduler = rmContext.getScheduler();
+    // Get the plan step size
+    planStepSize =
+        conf.getTimeDuration(
+            YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+            YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+            TimeUnit.MILLISECONDS);
+    if (planStepSize < 0) {
+      planStepSize =
+          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
+    }
+    // Create a plan corresponding to every reservable queue
+    Set<String> planQueueNames = scheduler.getPlanQueues();
+    for (String planQueueName : planQueueNames) {
+      Plan plan = initializePlan(planQueueName);
+      plans.put(planQueueName, plan);
+    }
+  }
+
+  private void initializeNewPlans(Configuration conf) {
+    LOG.info("Refreshing Reservation system");
+    writeLock.lock();
+    try {
+      // Create a plan corresponding to every new reservable queue
+      Set<String> planQueueNames = scheduler.getPlanQueues();
+      for (String planQueueName : planQueueNames) {
+        if (!plans.containsKey(planQueueName)) {
+          Plan plan = initializePlan(planQueueName);
+          plans.put(planQueueName, plan);
+        } else {
+          LOG.warn("Plan based on reservation queue {0} already exists.",
+              planQueueName);
+        }
+      }
+      // Update the plan follower with the active plans
+      if (planFollower != null) {
+        planFollower.setPlans(plans.values());
+      }
+    } catch (YarnException e) {
+      LOG.warn("Exception while trying to refresh reservable queues", e);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private PlanFollower createPlanFollower() {
+    String planFollowerPolicyClassName =
+        conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
+            getDefaultPlanFollower());
+    if (planFollowerPolicyClassName == null) {
+      return null;
+    }
+    LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
+    try {
+      Class<?> planFollowerPolicyClazz =
+          conf.getClassByName(planFollowerPolicyClassName);
+      if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
+        return (PlanFollower) ReflectionUtils.newInstance(
+            planFollowerPolicyClazz, conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+            + " not instance of " + PlanFollower.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate PlanFollowerPolicy: "
+              + planFollowerPolicyClassName, e);
+    }
+  }
+
+  private String getDefaultPlanFollower() {
+    // currently only capacity scheduler is supported
+    if (scheduler instanceof CapacityScheduler) {
+      return CapacitySchedulerPlanFollower.class.getName();
+    }
+    return null;
+  }
+
+  @Override
+  public Plan getPlan(String planName) {
+    readLock.lock();
+    try {
+      return plans.get(planName);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * @return the planStepSize
+   */
+  @Override
+  public long getPlanFollowerTimeStep() {
+    readLock.lock();
+    try {
+      return planStepSize;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void synchronizePlan(String planName) {
+    writeLock.lock();
+    try {
+      Plan plan = plans.get(planName);
+      if (plan != null) {
+        planFollower.synchronizePlan(plan);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Configuration configuration = new Configuration(conf);
+    reinitialize(configuration, rmContext);
+    // Create the plan follower with the active plans
+    planFollower = createPlanFollower();
+    if (planFollower != null) {
+      planFollower.init(clock, scheduler, plans.values());
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    if (planFollower != null) {
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+      scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
+          planStepSize, TimeUnit.MILLISECONDS);
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() {
+    // Stop the plan follower
+    if (scheduledExecutorService != null
+        && !scheduledExecutorService.isShutdown()) {
+      scheduledExecutorService.shutdown();
+    }
+    // Clear the plans
+    plans.clear();
+  }
+
+  @Override
+  public String getQueueForReservation(ReservationId reservationId) {
+    readLock.lock();
+    try {
+      return resQMap.get(reservationId);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void setQueueForReservation(ReservationId reservationId,
+      String queueName) {
+    writeLock.lock();
+    try {
+      resQMap.put(reservationId, queueName);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public ReservationId getNewReservationId() {
+    writeLock.lock();
+    try {
+      ReservationId resId =
+          ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
+              resCounter.incrementAndGet());
+      LOG.info("Allocated new reservationId: " + resId);
+      return resId;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Map<String, Plan> getAllPlans() {
+    return plans;
+  }
+
+  /**
+   * Get the default reservation system corresponding to the scheduler
+   * 
+   * @param scheduler the scheduler for which the reservation system is required
+   */
+  public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
+    // currently only capacity scheduler is supported
+    if (scheduler instanceof CapacityScheduler) {
+      return CapacityReservationSystem.class.getName();
+    }
+    return null;
+  }
+
+  protected abstract Plan initializePlan(String planQueueName)
+      throws YarnException;
+
+  protected abstract Planner getReplanner(String planQueueName);
+
+  protected abstract ReservationAgent getAgent(String queueName);
+
+  protected abstract SharingPolicy getAdmissionPolicy(String queueName);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java
new file mode 100644
index 0000000..548fde1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java
@@ -0,0 +1,146 @@
+/**
+ *
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the implementation of {@link ReservationSystem} based on the
+ * {@link CapacityScheduler}
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class CapacityReservationSystem extends AbstractReservationSystem {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CapacityReservationSystem.class);
+
+  private CapacityScheduler capScheduler;
+
+  public CapacityReservationSystem() {
+    super(CapacityReservationSystem.class.getName());
+  }
+
+  @Override
+  public void reinitialize(Configuration conf, RMContext rmContext)
+      throws YarnException {
+    // Validate if the scheduler is capacity based
+    ResourceScheduler scheduler = rmContext.getScheduler();
+    if (!(scheduler instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException("Class "
+          + scheduler.getClass().getCanonicalName() + " not instance of "
+          + CapacityScheduler.class.getCanonicalName());
+    }
+    capScheduler = (CapacityScheduler) scheduler;
+    this.conf = conf;
+    super.reinitialize(conf, rmContext);
+  }
+
+  @Override
+  protected Plan initializePlan(String planQueueName) throws YarnException {
+    SharingPolicy adPolicy = getAdmissionPolicy(planQueueName);
+    String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath();
+    adPolicy.init(planQueuePath, capScheduler.getConfiguration());
+    CSQueue planQueue = capScheduler.getQueue(planQueueName);
+    // Calculate the max plan capacity
+    Resource minAllocation = capScheduler.getMinimumResourceCapability();
+    ResourceCalculator rescCalc = capScheduler.getResourceCalculator();
+    Resource totCap =
+        rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(),
+            planQueue.getAbsoluteCapacity(), minAllocation);
+    Plan plan =
+        new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy,
+            getAgent(planQueuePath), totCap, planStepSize, rescCalc,
+            minAllocation, capScheduler.getMaximumResourceCapability(),
+            planQueueName, getReplanner(planQueuePath), capScheduler
+                .getConfiguration().getMoveOnExpiry(planQueuePath));
+    LOG.info("Intialized plan {0} based on reservable queue {1}",
+        plan.toString(), planQueueName);
+    return plan;
+  }
+
+  @Override
+  protected Planner getReplanner(String planQueueName) {
+    CapacitySchedulerConfiguration capSchedulerConfig =
+        capScheduler.getConfiguration();
+    String plannerClassName = capSchedulerConfig.getReplanner(planQueueName);
+    LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+        + planQueueName);
+    try {
+      Class<?> plannerClazz =
+          capSchedulerConfig.getClassByName(plannerClassName);
+      if (Planner.class.isAssignableFrom(plannerClazz)) {
+        Planner planner =
+            (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
+        planner.init(planQueueName, capSchedulerConfig);
+        return planner;
+      } else {
+        throw new YarnRuntimeException("Class: " + plannerClazz
+            + " not instance of " + Planner.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException("Could not instantiate Planner: "
+          + plannerClassName + " for queue: " + planQueueName, e);
+    }
+  }
+
+  @Override
+  protected ReservationAgent getAgent(String queueName) {
+    CapacitySchedulerConfiguration capSchedulerConfig =
+        capScheduler.getConfiguration();
+    String agentClassName = capSchedulerConfig.getReservationAgent(queueName);
+    LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
+    try {
+      Class<?> agentClazz = capSchedulerConfig.getClassByName(agentClassName);
+      if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
+        return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + agentClassName
+            + " not instance of " + ReservationAgent.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException("Could not instantiate Agent: "
+          + agentClassName + " for queue: " + queueName, e);
+    }
+  }
+
+  @Override
+  protected SharingPolicy getAdmissionPolicy(String queueName) {
+    CapacitySchedulerConfiguration capSchedulerConfig =
+        capScheduler.getConfiguration();
+    String admissionPolicyClassName =
+        capSchedulerConfig.getReservationAdmissionPolicy(queueName);
+    LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+        + " for queue: " + queueName);
+    try {
+      Class<?> admissionPolicyClazz =
+          capSchedulerConfig.getClassByName(admissionPolicyClassName);
+      if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
+        return (SharingPolicy) ReflectionUtils.newInstance(
+            admissionPolicyClazz, conf);
+      } else {
+        throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+            + " not instance of " + SharingPolicy.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
+          + admissionPolicyClassName + " for queue: " + queueName, e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
new file mode 100644
index 0000000..678773d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
@@ -0,0 +1,244 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class ReservationInputValidator {
+
+  private final Clock clock;
+
+  /**
+   * Utility class to validate reservation requests.
+   */
+  public ReservationInputValidator(Clock clock) {
+    this.clock = clock;
+  }
+
+  private Plan validateReservation(ReservationSystem reservationSystem,
+      ReservationId reservationId, String auditConstant) throws YarnException {
+    String message = "";
+    // check if the reservation id is valid
+    if (reservationId == null) {
+      message =
+          "Missing reservation id."
+              + " Please try again by specifying a reservation id.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    String queueName = reservationSystem.getQueueForReservation(reservationId);
+    if (queueName == null) {
+      message =
+          "The specified reservation with ID: " + reservationId
+              + " is unknown. Please try again with a valid reservation.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // check if the associated plan is valid
+    Plan plan = reservationSystem.getPlan(queueName);
+    if (plan == null) {
+      message =
+          "The specified reservation: " + reservationId
+              + " is not associated with any valid plan."
+              + " Please try again with a valid reservation.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    return plan;
+  }
+
+  private void validateReservationDefinition(ReservationId reservationId,
+      ReservationDefinition contract, Plan plan, String auditConstant)
+      throws YarnException {
+    String message = "";
+    // check if deadline is in the past
+    if (contract == null) {
+      message =
+          "Missing reservation definition."
+              + " Please try again by specifying a reservation definition.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    if (contract.getDeadline() <= clock.getTime()) {
+      message =
+          "The specified deadline: " + contract.getDeadline()
+              + " is the past. Please try again with deadline in the future.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // Check if at least one RR has been specified
+    ReservationRequests resReqs = contract.getReservationRequests();
+    if (resReqs == null) {
+      message =
+          "No resources have been specified to reserve."
+              + "Please try again by specifying the resources to reserve.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    List<ReservationRequest> resReq = resReqs.getReservationResources();
+    if (resReq == null || resReq.isEmpty()) {
+      message =
+          "No resources have been specified to reserve."
+              + " Please try again by specifying the resources to reserve.";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // compute minimum duration and max gang size
+    long minDuration = 0;
+    Resource maxGangSize = Resource.newInstance(0, 0);
+    ReservationRequestInterpreter type =
+        contract.getReservationRequests().getInterpreter();
+    for (ReservationRequest rr : resReq) {
+      if (type == ReservationRequestInterpreter.R_ALL
+          || type == ReservationRequestInterpreter.R_ANY) {
+        minDuration = Math.max(minDuration, rr.getDuration());
+      } else {
+        minDuration += rr.getDuration();
+      }
+      maxGangSize =
+          Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
+              maxGangSize,
+              Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+    }
+    // verify the allocation is possible (skip for ANY)
+    if (contract.getDeadline() - contract.getArrival() < minDuration
+        && type != ReservationRequestInterpreter.R_ANY) {
+      message =
+          "The time difference ("
+              + (contract.getDeadline() - contract.getArrival())
+              + ") between arrival (" + contract.getArrival() + ") "
+              + "and deadline (" + contract.getDeadline() + ") must "
+              + " be greater or equal to the minimum resource duration ("
+              + minDuration + ")";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+    // check that the largest gang does not exceed the inventory available
+    // capacity (skip for ANY)
+    if (Resources.greaterThan(plan.getResourceCalculator(),
+        plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
+        && type != ReservationRequestInterpreter.R_ANY) {
+      message =
+          "The size of the largest gang in the reservation refinition ("
+              + maxGangSize + ") exceed the capacity available ("
+              + plan.getTotalCapacity() + " )";
+      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+          "validate reservation input definition", "ClientRMService", message);
+      throw RPCUtil.getRemoteException(message);
+    }
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast) the input and returns the appropriate {@link Plan} associated with
+   * the specified {@link Queue} or throws an exception message illustrating the
+   * details of any validation check failures
+   * 
+   * @param reservationSystem the {@link ReservationSystem} to validate against
+   * @param request the {@link ReservationSubmissionRequest} defining the
+   *          resources required over time for the request
+   * @param reservationId the {@link ReservationId} associated with the current
+   *          request
+   * @return the {@link Plan} to submit the request to
+   * @throws YarnException
+   */
+  public Plan validateReservationSubmissionRequest(
+      ReservationSystem reservationSystem,
+      ReservationSubmissionRequest request, ReservationId reservationId)
+      throws YarnException {
+    // Check if it is a managed queue
+    String queueName = request.getQueue();
+    if (queueName == null || queueName.isEmpty()) {
+      String errMsg =
+          "The queue to submit is not specified."
+              + " Please try again with a valid reservable queue.";
+      RMAuditLogger.logFailure("UNKNOWN",
+          AuditConstants.SUBMIT_RESERVATION_REQUEST,
+          "validate reservation input", "ClientRMService", errMsg);
+      throw RPCUtil.getRemoteException(errMsg);
+    }
+    Plan plan = reservationSystem.getPlan(queueName);
+    if (plan == null) {
+      String errMsg =
+          "The specified queue: " + queueName
+              + " is not managed by reservation system."
+              + " Please try again with a valid reservable queue.";
+      RMAuditLogger.logFailure("UNKNOWN",
+          AuditConstants.SUBMIT_RESERVATION_REQUEST,
+          "validate reservation input", "ClientRMService", errMsg);
+      throw RPCUtil.getRemoteException(errMsg);
+    }
+    validateReservationDefinition(reservationId,
+        request.getReservationDefinition(), plan,
+        AuditConstants.SUBMIT_RESERVATION_REQUEST);
+    return plan;
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast) the input and returns the appropriate {@link Plan} associated with
+   * the specified {@link Queue} or throws an exception message illustrating the
+   * details of any validation check failures
+   * 
+   * @param reservationSystem the {@link ReservationSystem} to validate against
+   * @param request the {@link ReservationUpdateRequest} defining the resources
+   *          required over time for the request
+   * @return the {@link Plan} to submit the request to
+   * @throws YarnException
+   */
+  public Plan validateReservationUpdateRequest(
+      ReservationSystem reservationSystem, ReservationUpdateRequest request)
+      throws YarnException {
+    ReservationId reservationId = request.getReservationId();
+    Plan plan =
+        validateReservation(reservationSystem, reservationId,
+            AuditConstants.UPDATE_RESERVATION_REQUEST);
+    validateReservationDefinition(reservationId,
+        request.getReservationDefinition(), plan,
+        AuditConstants.UPDATE_RESERVATION_REQUEST);
+    return plan;
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast) the input and returns the appropriate {@link Plan} associated with
+   * the specified {@link Queue} or throws an exception message illustrating the
+   * details of any validation check failures
+   * 
+   * @param reservationSystem the {@link ReservationSystem} to validate against
+   * @param request the {@link ReservationDeleteRequest} defining the resources
+   *          required over time for the request
+   * @return the {@link Plan} to submit the request to
+   * @throws YarnException
+   */
+  public Plan validateReservationDeleteRequest(
+      ReservationSystem reservationSystem, ReservationDeleteRequest request)
+      throws YarnException {
+    return validateReservation(reservationSystem, request.getReservationId(),
+        AuditConstants.DELETE_RESERVATION_REQUEST);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
new file mode 100644
index 0000000..cb76dcf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+/**
+ * This interface is the one implemented by any system that wants to support
+ * Reservations i.e. make {@link Resource} allocations in future. Implementors
+ * need to bootstrap all configured {@link Plan}s in the active
+ * {@link ResourceScheduler} along with their corresponding
+ * {@link ReservationAgent} and {@link SharingPolicy}. It is also responsible
+ * for managing the {@link PlanFollower} to ensure the {@link Plan}s are in sync
+ * with the {@link ResourceScheduler}.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public interface ReservationSystem {
+
+  /**
+   * Set RMContext for {@link ReservationSystem}. This method should be called
+   * immediately after instantiating a reservation system once.
+   * 
+   * @param rmContext created by {@link ResourceManager}
+   */
+  void setRMContext(RMContext rmContext);
+
+  /**
+   * Re-initialize the {@link ReservationSystem}.
+   * 
+   * @param conf configuration
+   * @param rmContext current context of the {@link ResourceManager}
+   * @throws YarnException
+   */
+  void reinitialize(Configuration conf, RMContext rmContext)
+      throws YarnException;
+
+  /**
+   * Get an existing {@link Plan} that has been initialized.
+   * 
+   * @param planName the name of the {@link Plan}
+   * @return the {@link Plan} identified by name
+   * 
+   */
+  Plan getPlan(String planName);
+
+  /**
+   * Return a map containing all the plans known to this ReservationSystem
+   * (useful for UI)
+   * 
+   * @return a Map of Plan names and Plan objects
+   */
+  Map<String, Plan> getAllPlans();
+
+  /**
+   * Invokes {@link PlanFollower} to synchronize the specified {@link Plan} with
+   * the {@link ResourceScheduler}
+   * 
+   * @param planName the name of the {@link Plan} to be synchronized
+   */
+  void synchronizePlan(String planName);
+
+  /**
+   * Return the time step (ms) at which the {@link PlanFollower} is invoked
+   * 
+   * @return the time step (ms) at which the {@link PlanFollower} is invoked
+   */
+  long getPlanFollowerTimeStep();
+
+  /**
+   * Get a new unique {@link ReservationId}.
+   * 
+   * @return a new unique {@link ReservationId}
+   * 
+   */
+  ReservationId getNewReservationId();
+
+  /**
+   * Get the {@link Queue} that an existing {@link ReservationId} is associated
+   * with.
+   * 
+   * @param reservationId the unique id of the reservation
+   * @return the name of the associated Queue
+   * 
+   */
+  String getQueueForReservation(ReservationId reservationId);
+
+  /**
+   * Set the {@link Queue} that an existing {@link ReservationId} should be
+   * associated with.
+   * 
+   * @param reservationId the unique id of the reservation
+   * @param queueName the name of Queue to associate the reservation with
+   * 
+   */
+  void setQueueForReservation(ReservationId reservationId, String queueName);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index a1ae3ca..624aa18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -236,4 +237,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return metrics
    */
   RMAppMetrics getRMAppMetrics();
+
+  ReservationId getReservationId();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 4899434..84ec766 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -1284,4 +1285,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.systemClock = clock;
   }
 
+  @Override
+  public ReservationId getReservationId() {
+    return submissionContext.getReservationID();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d244b2ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index 7e0b89e..a54e4bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -19,25 +19,33 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 
 public class AppAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final ReservationId reservationID;
   private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
-    this(applicationId, queue, user, false);
+    this(applicationId, queue, user, false, null);
   }
 
   public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
-      String user, boolean isAppRecovering) {
+      String user, ReservationId reservationID) {
+    this(applicationId, queue, user, false, reservationID);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering, ReservationId reservationID) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.reservationID = reservationID;
     this.isAppRecovering = isAppRecovering;
   }
 
@@ -56,4 +64,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
   public boolean getIsAppRecovering() {
     return isAppRecovering;
   }
+
+  public ReservationId getReservationID() {
+    return reservationID;
+  }
 }