You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2015/07/25 16:49:40 UTC

[1/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino) (cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 621203bf4 -> 26ea04581


http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
new file mode 100644
index 0000000..bd18a2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -0,0 +1,611 @@
+/*******************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *  
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestGreedyReservationAgent {
+
+  ReservationAgent agent;
+  InMemoryPlan plan;
+  Resource minAlloc = Resource.newInstance(1024, 1);
+  ResourceCalculator res = new DefaultResourceCalculator();
+  Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+  Random rand = new Random();
+  long step;
+
+  @Before
+  public void setup() throws Exception {
+
+    long seed = rand.nextLong();
+    rand.setSeed(seed);
+    Log.info("Running with seed: " + seed);
+
+    // setting completely loose quotas
+    long timeWindow = 1000000L;
+    Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
+    step = 1000L;
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    String reservationQ = testUtil.getFullReservationQueueName();
+
+    float instConstraint = 100;
+    float avgConstraint = 100;
+
+    ReservationSchedulerConfiguration conf =
+        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+            instConstraint, avgConstraint);
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, conf);
+    agent = new GreedyReservationAgent();
+
+    QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+    plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+        res, minAlloc, maxAlloc, "dedicated", null, true);
+  }
+
+  @SuppressWarnings("javadoc")
+  @Test
+  public void testSimple() throws PlanningException {
+
+    prepareBasicPlan();
+
+    // create a request with a single atomic ask
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(5 * step);
+    rr.setDeadline(20 * step);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 5, 10 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setReservationResources(Collections.singletonList(r));
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    for (long i = 10 * step; i < 20 * step; i++) {
+      assertTrue(
+          "Agent-based allocation unexpected",
+          Resources.equals(cs.getResourcesAtTime(i),
+              Resource.newInstance(2048 * 10, 2 * 10)));
+    }
+
+  }
+
+  @Test
+  public void testOrder() throws PlanningException {
+    prepareBasicPlan();
+
+    // create a completely utilized segment around time 30
+    int[] f = { 100, 100 };
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 30 * step, 30 * step + f.length * step,
+            ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+            res, minAlloc)));
+
+    // create a chain of 4 RR, mixing gang and non-gang
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(0 * step);
+    rr.setDeadline(70 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 1, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 10, 10, 20 * step);
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 4);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
+
+    System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testOrderNoGapImpossible() throws PlanningException {
+    prepareBasicPlan();
+    // create a completely utilized segment at time 30
+    int[] f = { 100, 100 };
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 30 * step, 30 * step + f.length * step,
+            ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
+            res, minAlloc)));
+
+    // create a chain of 4 RR, mixing gang and non-gang
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(0L);
+
+    rr.setDeadline(70L);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 1, 10);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 10, 10, 20);
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean result = false;
+    try {
+      // submit to agent
+      result = agent.createReservation(reservationID, "u1", plan, rr);
+      fail();
+    } catch (PlanningException p) {
+      // expected
+    }
+
+    // validate
+    assertFalse("Agent-based allocation should have failed", result);
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == 3);
+
+    System.out
+        .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+            + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testOrderNoGap() throws PlanningException {
+    prepareBasicPlan();
+    // create a chain of 4 RR, mixing gang and non-gang
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(0 * step);
+    rr.setDeadline(60 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 1, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 10, 10, 20 * step);
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    // validate
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
+
+  }
+
+  @Test
+  public void testSingleSliding() throws PlanningException {
+    prepareBasicPlan();
+
+    // create a single request for which we need subsequent (tight) packing.
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100 * step);
+    rr.setDeadline(120 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 200, 10, 10 * step);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate results, we expect the second one to be accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
+
+    System.out.println("--------AFTER packed ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAny() throws PlanningException {
+    prepareBasicPlan();
+    // create an ANY request, with an impossible step (last in list, first
+    // considered),
+    // and two satisfiable ones. We expect the second one to be returned.
+
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100 * step);
+    rr.setDeadline(120 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 5, 5, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 5, 10 * step);
+    ReservationRequest r3 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 110, 110, 10 * step);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    list.add(r3);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean res = agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate results, we expect the second one to be accepted
+    assertTrue("Agent-based allocation failed", res);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
+
+    System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+        + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAnyImpossible() throws PlanningException {
+    prepareBasicPlan();
+    // create an ANY request, with all impossible alternatives
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100L);
+    rr.setDeadline(120L);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
+
+    // longer than arrival-deadline
+    ReservationRequest r1 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 35, 5, 30);
+    // above max cluster size
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 110, 110, 10);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r1);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean result = false;
+    try {
+      // submit to agent
+      result = agent.createReservation(reservationID, "u1", plan, rr);
+      fail();
+    } catch (PlanningException p) {
+      // expected
+    }
+    // validate results, we expect the second one to be accepted
+    assertFalse("Agent-based allocation should have failed", result);
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == 2);
+
+    System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAll() throws PlanningException {
+    prepareBasicPlan();
+    // create an ALL request
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100 * step);
+    rr.setDeadline(120 * step);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 5, 5, 10 * step);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 10, 10, 20 * step);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    // submit to agent
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr);
+
+    // validate results, we expect the second one to be accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 3);
+
+    ReservationAllocation cs = plan.getReservationById(reservationID);
+
+    assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
+    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
+
+    System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+        + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  @Test
+  public void testAllImpossible() throws PlanningException {
+    prepareBasicPlan();
+    // create an ALL request, with an impossible combination, it should be
+    // rejected, and allocation remain unchanged
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(100L);
+    rr.setDeadline(120L);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 55, 5, 10);
+    ReservationRequest r2 = ReservationRequest.newInstance(
+        Resource.newInstance(2048, 2), 55, 5, 20);
+
+    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
+    list.add(r);
+    list.add(r2);
+    reqs.setReservationResources(list);
+    rr.setReservationRequests(reqs);
+
+    ReservationId reservationID = ReservationSystemTestUtil
+        .getNewReservationId();
+    boolean result = false;
+    try {
+      // submit to agent
+      result = agent.createReservation(reservationID, "u1", plan, rr);
+      fail();
+    } catch (PlanningException p) {
+      // expected
+    }
+
+    // validate results, we expect the second one to be accepted
+    assertFalse("Agent-based allocation failed", result);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 2);
+
+    System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+        + reservationID + ")----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+  }
+
+  private void prepareBasicPlan() throws PlanningException {
+
+    // insert in the reservation a couple of controlled reservations, to create
+    // conditions for assignment that are non-empty
+
+    int[] f = { 10, 10, 20, 20, 20, 10, 10 };
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
+                .generateAllocation(0, step, f), res, minAlloc)));
+
+    int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
+    Map<ReservationInterval, Resource> alloc =
+        ReservationSystemTestUtil.generateAllocation(5000, step, f2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
+            "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
+
+    System.out.println("--------BEFORE AGENT----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+  }
+
+  private boolean check(ReservationAllocation cs, long start, long end,
+      int containers, int mem, int cores) {
+
+    boolean res = true;
+    for (long i = start; i < end; i++) {
+      res = res
+          && Resources.equals(cs.getResourcesAtTime(i),
+              Resource.newInstance(mem * containers, cores * containers));
+    }
+    return res;
+  }
+
+  public void testStress(int numJobs) throws PlanningException, IOException {
+
+    long timeWindow = 1000000L;
+    Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
+    step = 1000L;
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
+    String reservationQ = testUtil.getFullReservationQueueName();
+    float instConstraint = 100;
+    float avgConstraint = 100;
+    ReservationSchedulerConfiguration conf =
+        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+            instConstraint, avgConstraint);
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, conf);
+
+    plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
+      clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
+
+    int acc = 0;
+    List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+    for (long i = 0; i < numJobs; i++) {
+      list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
+    }
+
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < numJobs; i++) {
+
+      try {
+        if (agent.createReservation(
+            ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
+            plan, list.get(i))) {
+          acc++;
+        }
+      } catch (PlanningException p) {
+        // ignore exceptions
+      }
+    }
+
+    long end = System.currentTimeMillis();
+    System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+        + " in " + (end - start) + "ms");
+  }
+
+  public static void main(String[] arg) {
+
+    // run a stress test with by default 1000 random jobs
+    int numJobs = 1000;
+    if (arg.length > 0) {
+      numJobs = Integer.parseInt(arg[0]);
+    }
+
+    try {
+      TestGreedyReservationAgent test = new TestGreedyReservationAgent();
+      test.setup();
+      test.testStress(numJobs);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
new file mode 100644
index 0000000..aeb1e6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
@@ -0,0 +1,170 @@
+/*******************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *  
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Test;
+
+public class TestSimpleCapacityReplanner {
+
+  @Test
+  public void testReplanningPlanCapacityLoss() throws PlanningException {
+
+    Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
+    Resource minAlloc = Resource.newInstance(1024, 1);
+    Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+
+    ResourceCalculator res = new DefaultResourceCalculator();
+    long step = 1L;
+    Clock clock = mock(Clock.class);
+    ReservationAgent agent = mock(ReservationAgent.class);
+
+    SharingPolicy policy = new NoOverCommitPolicy();
+    policy.init("root.dedicated", null);
+
+    QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+    when(clock.getTime()).thenReturn(0L);
+    SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
+
+    ReservationSchedulerConfiguration conf =
+        mock(ReservationSchedulerConfiguration.class);
+    when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
+
+    enf.init("blah", conf);
+
+    // Initialize the plan with more resources
+    InMemoryPlan plan =
+        new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+            res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
+
+    // add reservation filling the plan (separating them 1ms, so we are sure
+    // s2 follows s1 on acceptance
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int[] f5 = { 20, 20, 20, 20, 20 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(1L);
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(2L);
+    ReservationId r3 = ReservationId.newInstance(ts, 3);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(3L);
+    ReservationId r4 = ReservationId.newInstance(ts, 4);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(4L);
+    ReservationId r5 = ReservationId.newInstance(ts, 5);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
+            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
+            minAlloc)));
+
+    int[] f6 = { 50, 50, 50, 50, 50 };
+    ReservationId r6 = ReservationId.newInstance(ts, 6);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
+            "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+            minAlloc)));
+    when(clock.getTime()).thenReturn(6L);
+    ReservationId r7 = ReservationId.newInstance(ts, 7);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
+            "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
+            minAlloc)));
+
+    // remove some of the resources (requires replanning)
+    plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
+
+    when(clock.getTime()).thenReturn(0L);
+
+    // run the replanner
+    enf.plan(plan, null);
+
+    // check which reservation are still present
+    assertNotNull(plan.getReservationById(r1));
+    assertNotNull(plan.getReservationById(r2));
+    assertNotNull(plan.getReservationById(r3));
+    assertNotNull(plan.getReservationById(r6));
+    assertNotNull(plan.getReservationById(r7));
+
+    // and which ones are removed
+    assertNull(plan.getReservationById(r4));
+    assertNull(plan.getReservationById(r5));
+
+    // check resources at each moment in time no more exceed capacity
+    for (int i = 0; i < 20; i++) {
+      int tot = 0;
+      for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
+        tot = r.getResourcesAtTime(i).getMemory();
+      }
+      assertTrue(tot <= 70 * 1024);
+    }
+  }
+
+  private Map<ReservationInterval, Resource> generateAllocation(
+      int startTime, int[] alloc) {
+    Map<ReservationInterval, Resource> req =
+        new TreeMap<ReservationInterval, Resource>();
+    for (int i = 0; i < alloc.length; i++) {
+      req.put(new ReservationInterval(startTime + i, startTime + i + 1),
+          ReservationSystemUtil.toResource(
+              ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                  alloc[i])));
+    }
+    return req;
+  }
+
+}


[2/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino) (cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)

Posted by cu...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
deleted file mode 100644
index de94dcd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java
+++ /dev/null
@@ -1,604 +0,0 @@
-/*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *  
- *       http://www.apache.org/licenses/LICENSE-2.0
- *  
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.ReservationRequests;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Before;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-public class TestGreedyReservationAgent {
-
-  ReservationAgent agent;
-  InMemoryPlan plan;
-  Resource minAlloc = Resource.newInstance(1024, 1);
-  ResourceCalculator res = new DefaultResourceCalculator();
-  Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
-  Random rand = new Random();
-  long step;
-
-  @Before
-  public void setup() throws Exception {
-
-    long seed = rand.nextLong();
-    rand.setSeed(seed);
-    Log.info("Running with seed: " + seed);
-
-    // setting completely loose quotas
-    long timeWindow = 1000000L;
-    Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
-    step = 1000L;
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
-
-    float instConstraint = 100;
-    float avgConstraint = 100;
-
-    ReservationSchedulerConfiguration conf =
-        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
-            instConstraint, avgConstraint);
-    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
-    policy.init(reservationQ, conf);
-    agent = new GreedyReservationAgent();
-
-    QueueMetrics queueMetrics = mock(QueueMetrics.class);
-
-    plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
-        res, minAlloc, maxAlloc, "dedicated", null, true);
-  }
-
-  @SuppressWarnings("javadoc")
-  @Test
-  public void testSimple() throws PlanningException {
-
-    prepareBasicPlan();
-
-    // create a request with a single atomic ask
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(5 * step);
-    rr.setDeadline(20 * step);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 10, 5, 10 * step);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setReservationResources(Collections.singletonList(r));
-    rr.setReservationRequests(reqs);
-
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    agent.createReservation(reservationID, "u1", plan, rr);
-
-    assertTrue("Agent-based allocation failed", reservationID != null);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 3);
-
-    ReservationAllocation cs = plan.getReservationById(reservationID);
-
-    System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
-        + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-    for (long i = 10 * step; i < 20 * step; i++) {
-      assertTrue(
-          "Agent-based allocation unexpected",
-          Resources.equals(cs.getResourcesAtTime(i),
-              Resource.newInstance(2048 * 10, 2 * 10)));
-    }
-
-  }
-
-  @Test
-  public void testOrder() throws PlanningException {
-    prepareBasicPlan();
-
-    // create a completely utilized segment around time 30
-    int[] f = { 100, 100 };
-
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-            "dedicated", 30 * step, 30 * step + f.length * step,
-            ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
-            res, minAlloc)));
-
-    // create a chain of 4 RR, mixing gang and non-gang
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(0 * step);
-    rr.setDeadline(70 * step);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 10, 1, 10 * step);
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 10, 10, 20 * step);
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    list.add(r2);
-    list.add(r);
-    list.add(r2);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    // submit to agent
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    agent.createReservation(reservationID, "u1", plan, rr);
-
-    // validate
-    assertTrue("Agent-based allocation failed", reservationID != null);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 4);
-
-    ReservationAllocation cs = plan.getReservationById(reservationID);
-
-    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
-
-    System.out.println("--------AFTER ORDER ALLOCATION (queue: "
-        + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  @Test
-  public void testOrderNoGapImpossible() throws PlanningException {
-    prepareBasicPlan();
-    // create a completely utilized segment at time 30
-    int[] f = { 100, 100 };
-
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-            "dedicated", 30 * step, 30 * step + f.length * step,
-            ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
-            res, minAlloc)));
-
-    // create a chain of 4 RR, mixing gang and non-gang
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(0L);
-
-    rr.setDeadline(70L);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 10, 1, 10);
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 10, 10, 20);
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    list.add(r2);
-    list.add(r);
-    list.add(r2);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    boolean result = false;
-    try {
-      // submit to agent
-      result = agent.createReservation(reservationID, "u1", plan, rr);
-      fail();
-    } catch (PlanningException p) {
-      // expected
-    }
-
-    // validate
-    assertFalse("Agent-based allocation should have failed", result);
-    assertTrue("Agent-based allocation should have failed", plan
-        .getAllReservations().size() == 3);
-
-    System.out
-        .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
-            + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  @Test
-  public void testOrderNoGap() throws PlanningException {
-    prepareBasicPlan();
-    // create a chain of 4 RR, mixing gang and non-gang
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(0 * step);
-    rr.setDeadline(60 * step);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 10, 1, 10 * step);
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 10, 10, 20 * step);
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    list.add(r2);
-    list.add(r);
-    list.add(r2);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-    rr.setReservationRequests(reqs);
-
-    // submit to agent
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    agent.createReservation(reservationID, "u1", plan, rr);
-
-    System.out.println("--------AFTER ORDER ALLOCATION (queue: "
-        + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-    // validate
-    assertTrue("Agent-based allocation failed", reservationID != null);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 3);
-
-    ReservationAllocation cs = plan.getReservationById(reservationID);
-
-    assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
-
-  }
-
-  @Test
-  public void testSingleSliding() throws PlanningException {
-    prepareBasicPlan();
-
-    // create a single request for which we need subsequent (tight) packing.
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(100 * step);
-    rr.setDeadline(120 * step);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 200, 10, 10 * step);
-
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    // submit to agent
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    agent.createReservation(reservationID, "u1", plan, rr);
-
-    // validate results, we expect the second one to be accepted
-    assertTrue("Agent-based allocation failed", reservationID != null);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 3);
-
-    ReservationAllocation cs = plan.getReservationById(reservationID);
-
-    assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
-
-    System.out.println("--------AFTER packed ALLOCATION (queue: "
-        + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  @Test
-  public void testAny() throws PlanningException {
-    prepareBasicPlan();
-    // create an ANY request, with an impossible step (last in list, first
-    // considered),
-    // and two satisfiable ones. We expect the second one to be returned.
-
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(100 * step);
-    rr.setDeadline(120 * step);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 5, 5, 10 * step);
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 10, 5, 10 * step);
-    ReservationRequest r3 = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 110, 110, 10 * step);
-
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    list.add(r2);
-    list.add(r3);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    // submit to agent
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    boolean res = agent.createReservation(reservationID, "u1", plan, rr);
-
-    // validate results, we expect the second one to be accepted
-    assertTrue("Agent-based allocation failed", res);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 3);
-
-    ReservationAllocation cs = plan.getReservationById(reservationID);
-
-    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
-
-    System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
-        + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  @Test
-  public void testAnyImpossible() throws PlanningException {
-    prepareBasicPlan();
-    // create an ANY request, with all impossible alternatives
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(100L);
-    rr.setDeadline(120L);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
-
-    // longer than arrival-deadline
-    ReservationRequest r1 = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 35, 5, 30);
-    // above max cluster size
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 110, 110, 10);
-
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r1);
-    list.add(r2);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    boolean result = false;
-    try {
-      // submit to agent
-      result = agent.createReservation(reservationID, "u1", plan, rr);
-      fail();
-    } catch (PlanningException p) {
-      // expected
-    }
-    // validate results, we expect the second one to be accepted
-    assertFalse("Agent-based allocation should have failed", result);
-    assertTrue("Agent-based allocation should have failed", plan
-        .getAllReservations().size() == 2);
-
-    System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
-        + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  @Test
-  public void testAll() throws PlanningException {
-    prepareBasicPlan();
-    // create an ALL request
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(100 * step);
-    rr.setDeadline(120 * step);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 5, 5, 10 * step);
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 10, 10, 20 * step);
-
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    list.add(r2);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    // submit to agent
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    agent.createReservation(reservationID, "u1", plan, rr);
-
-    // validate results, we expect the second one to be accepted
-    assertTrue("Agent-based allocation failed", reservationID != null);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 3);
-
-    ReservationAllocation cs = plan.getReservationById(reservationID);
-
-    assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
-    assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
-
-    System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
-        + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  @Test
-  public void testAllImpossible() throws PlanningException {
-    prepareBasicPlan();
-    // create an ALL request, with an impossible combination, it should be
-    // rejected, and allocation remain unchanged
-    ReservationDefinition rr = new ReservationDefinitionPBImpl();
-    rr.setArrival(100L);
-    rr.setDeadline(120L);
-    ReservationRequests reqs = new ReservationRequestsPBImpl();
-    reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
-    ReservationRequest r = ReservationRequest.newInstance(
-        Resource.newInstance(1024, 1), 55, 5, 10);
-    ReservationRequest r2 = ReservationRequest.newInstance(
-        Resource.newInstance(2048, 2), 55, 5, 20);
-
-    List<ReservationRequest> list = new ArrayList<ReservationRequest>();
-    list.add(r);
-    list.add(r2);
-    reqs.setReservationResources(list);
-    rr.setReservationRequests(reqs);
-
-    ReservationId reservationID = ReservationSystemTestUtil
-        .getNewReservationId();
-    boolean result = false;
-    try {
-      // submit to agent
-      result = agent.createReservation(reservationID, "u1", plan, rr);
-      fail();
-    } catch (PlanningException p) {
-      // expected
-    }
-
-    // validate results, we expect the second one to be accepted
-    assertFalse("Agent-based allocation failed", result);
-    assertTrue("Agent-based allocation failed", plan.getAllReservations()
-        .size() == 2);
-
-    System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
-        + reservationID + ")----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-
-  }
-
-  private void prepareBasicPlan() throws PlanningException {
-
-    // insert in the reservation a couple of controlled reservations, to create
-    // conditions for assignment that are non-empty
-
-    int[] f = { 10, 10, 20, 20, 20, 10, 10 };
-
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-            "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
-                .generateAllocation(0, step, f), res, minAlloc)));
-
-    int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
-    Map<ReservationInterval, Resource> alloc =
-        ReservationSystemTestUtil.generateAllocation(5000, step, f2);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(
-            ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-            "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
-
-    System.out.println("--------BEFORE AGENT----------");
-    System.out.println(plan.toString());
-    System.out.println(plan.toCumulativeString());
-  }
-
-  private boolean check(ReservationAllocation cs, long start, long end,
-      int containers, int mem, int cores) {
-
-    boolean res = true;
-    for (long i = start; i < end; i++) {
-      res = res
-          && Resources.equals(cs.getResourcesAtTime(i),
-              Resource.newInstance(mem * containers, cores * containers));
-    }
-    return res;
-  }
-
-  public void testStress(int numJobs) throws PlanningException, IOException {
-
-    long timeWindow = 1000000L;
-    Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
-    step = 1000L;
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
-    String reservationQ = testUtil.getFullReservationQueueName();
-    float instConstraint = 100;
-    float avgConstraint = 100;
-    ReservationSchedulerConfiguration conf =
-        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
-            instConstraint, avgConstraint);
-    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
-    policy.init(reservationQ, conf);
-
-    plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
-      clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
-
-    int acc = 0;
-    List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
-    for (long i = 0; i < numJobs; i++) {
-      list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
-    }
-
-    long start = System.currentTimeMillis();
-    for (int i = 0; i < numJobs; i++) {
-
-      try {
-        if (agent.createReservation(
-            ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
-            plan, list.get(i))) {
-          acc++;
-        }
-      } catch (PlanningException p) {
-        // ignore exceptions
-      }
-    }
-
-    long end = System.currentTimeMillis();
-    System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
-        + " in " + (end - start) + "ms");
-  }
-
-  public static void main(String[] arg) {
-
-    // run a stress test with by default 1000 random jobs
-    int numJobs = 1000;
-    if (arg.length > 0) {
-      numJobs = Integer.parseInt(arg[0]);
-    }
-
-    try {
-      TestGreedyReservationAgent test = new TestGreedyReservationAgent();
-      test.setup();
-      test.testStress(numJobs);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 722fb29..b6d24b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
index 1e15618..809892c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
index d0f4dc6..f0cc49c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java
@@ -6,9 +6,9 @@
  *   to you under the Apache License, Version 2.0 (the
  *   "License"); you may not use this file except in compliance
  *   with the License.  You may obtain a copy of the License at
- *  
+ *
  *       http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -164,6 +164,53 @@ public class TestRLESparseResourceAllocation {
     Assert.assertTrue(rleSparseVector.isEmpty());
   }
 
+  @Test
+  public void testToIntervalMap() {
+    ResourceCalculator resCalc = new DefaultResourceCalculator();
+    Resource minAlloc = Resource.newInstance(1, 1);
+    RLESparseResourceAllocation rleSparseVector =
+        new RLESparseResourceAllocation(resCalc, minAlloc);
+    Map<ReservationInterval, Resource> mapAllocations;
+
+    // Check empty
+    mapAllocations = rleSparseVector.toIntervalMap();
+    Assert.assertTrue(mapAllocations.isEmpty());
+
+    // Check full
+    int[] alloc = { 0, 5, 10, 10, 5, 0, 5, 0 };
+    int start = 100;
+    Set<Entry<ReservationInterval, Resource>> inputs =
+        generateAllocation(start, alloc, false).entrySet();
+    for (Entry<ReservationInterval, Resource> ip : inputs) {
+      rleSparseVector.addInterval(ip.getKey(), ip.getValue());
+    }
+    mapAllocations = rleSparseVector.toIntervalMap();
+    Assert.assertTrue(mapAllocations.size() == 5);
+    for (Entry<ReservationInterval, Resource> entry : mapAllocations
+        .entrySet()) {
+      ReservationInterval interval = entry.getKey();
+      Resource resource = entry.getValue();
+      if (interval.getStartTime() == 101L) {
+        Assert.assertTrue(interval.getEndTime() == 102L);
+        Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
+      } else if (interval.getStartTime() == 102L) {
+        Assert.assertTrue(interval.getEndTime() == 104L);
+        Assert.assertEquals(resource, Resource.newInstance(10 * 1024, 10));
+      } else if (interval.getStartTime() == 104L) {
+        Assert.assertTrue(interval.getEndTime() == 105L);
+        Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
+      } else if (interval.getStartTime() == 105L) {
+        Assert.assertTrue(interval.getEndTime() == 106L);
+        Assert.assertEquals(resource, Resource.newInstance(0 * 1024, 0));
+      } else if (interval.getStartTime() == 106L) {
+        Assert.assertTrue(interval.getEndTime() == 107L);
+        Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
+      } else {
+        Assert.fail();
+      }
+    }
+  }
+
   private Map<ReservationInterval, Resource> generateAllocation(
       int startTime, int[] alloc, boolean isStep) {
     Map<ReservationInterval, Resource> req =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
index 50df8fe..f5625fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
deleted file mode 100644
index d4a97ba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *  
- *       http://www.apache.org/licenses/LICENSE-2.0
- *  
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.junit.Test;
-
-public class TestSimpleCapacityReplanner {
-
-  @Test
-  public void testReplanningPlanCapacityLoss() throws PlanningException {
-
-    Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
-    Resource minAlloc = Resource.newInstance(1024, 1);
-    Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
-
-    ResourceCalculator res = new DefaultResourceCalculator();
-    long step = 1L;
-    Clock clock = mock(Clock.class);
-    ReservationAgent agent = mock(ReservationAgent.class);
-
-    SharingPolicy policy = new NoOverCommitPolicy();
-    policy.init("root.dedicated", null);
-
-    QueueMetrics queueMetrics = mock(QueueMetrics.class);
-
-    when(clock.getTime()).thenReturn(0L);
-    SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
-
-    ReservationSchedulerConfiguration conf =
-        mock(ReservationSchedulerConfiguration.class);
-    when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
-
-    enf.init("blah", conf);
-
-    // Initialize the plan with more resources
-    InMemoryPlan plan =
-        new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
-            res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
-
-    // add reservation filling the plan (separating them 1ms, so we are sure
-    // s2 follows s1 on acceptance
-    long ts = System.currentTimeMillis();
-    ReservationId r1 = ReservationId.newInstance(ts, 1);
-    int[] f5 = { 20, 20, 20, 20, 20 };
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
-            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
-    when(clock.getTime()).thenReturn(1L);
-    ReservationId r2 = ReservationId.newInstance(ts, 2);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
-            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
-    when(clock.getTime()).thenReturn(2L);
-    ReservationId r3 = ReservationId.newInstance(ts, 3);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
-            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
-    when(clock.getTime()).thenReturn(3L);
-    ReservationId r4 = ReservationId.newInstance(ts, 4);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
-            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
-    when(clock.getTime()).thenReturn(4L);
-    ReservationId r5 = ReservationId.newInstance(ts, 5);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
-            "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
-
-    int[] f6 = { 50, 50, 50, 50, 50 };
-    ReservationId r6 = ReservationId.newInstance(ts, 6);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
-            "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
-            minAlloc)));
-    when(clock.getTime()).thenReturn(6L);
-    ReservationId r7 = ReservationId.newInstance(ts, 7);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
-            "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
-            minAlloc)));
-
-    // remove some of the resources (requires replanning)
-    plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
-
-    when(clock.getTime()).thenReturn(0L);
-
-    // run the replanner
-    enf.plan(plan, null);
-
-    // check which reservation are still present
-    assertNotNull(plan.getReservationById(r1));
-    assertNotNull(plan.getReservationById(r2));
-    assertNotNull(plan.getReservationById(r3));
-    assertNotNull(plan.getReservationById(r6));
-    assertNotNull(plan.getReservationById(r7));
-
-    // and which ones are removed
-    assertNull(plan.getReservationById(r4));
-    assertNull(plan.getReservationById(r5));
-
-    // check resources at each moment in time no more exceed capacity
-    for (int i = 0; i < 20; i++) {
-      int tot = 0;
-      for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
-        tot = r.getResourcesAtTime(i).getMemory();
-      }
-      assertTrue(tot <= 70 * 1024);
-    }
-  }
-
-  private Map<ReservationInterval, Resource> generateAllocation(
-      int startTime, int[] alloc) {
-    Map<ReservationInterval, Resource> req =
-        new TreeMap<ReservationInterval, Resource>();
-    for (int i = 0; i < alloc.length; i++) {
-      req.put(new ReservationInterval(startTime + i, startTime + i + 1),
-          ReservationSystemUtil.toResource(
-              ReservationRequest.newInstance(Resource.newInstance(1024, 1),
-                  alloc[i])));
-    }
-    return req;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
new file mode 100644
index 0000000..9a1621a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestAlignedPlanner {
+
+  ReservationAgent agent;
+  InMemoryPlan plan;
+  Resource minAlloc = Resource.newInstance(1024, 1);
+  ResourceCalculator res = new DefaultResourceCalculator();
+  Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
+  Random rand = new Random();
+  long step;
+
+  @Test
+  public void testSingleReservationAccept() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario1();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            5 * step, // Job arrival time
+            20 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(2048, 2), // Capability
+                10, // Num containers
+                5, // Concurrency
+                10 * step) }, // Duration
+            ReservationRequestInterpreter.R_ORDER, "u1");
+
+    // Add reservation
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr1);
+
+    // CHECK: allocation was accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == numJobsInScenario + 1);
+
+    // Get reservation
+    ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+    // Verify allocation
+    assertTrue(alloc1.toString(),
+        check(alloc1, 10 * step, 20 * step, 10, 2048, 2));
+
+  }
+
+  @Test
+  public void testOrderNoGapImpossible() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10L, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step) }, // Duration
+            ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1");
+
+    // Add reservation
+    try {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u1", plan, rr1);
+      fail();
+    } catch (PlanningException e) {
+      // Expected failure
+    }
+
+    // CHECK: allocation was not accepted
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == numJobsInScenario);
+
+  }
+
+  @Test
+  public void testOrderNoGapImpossible2() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            13 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    10, // Num containers
+                    10, // Concurrency
+                    step) }, // Duration
+            ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1");
+
+    // Add reservation
+    try {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u1", plan, rr1);
+      fail();
+    } catch (PlanningException e) {
+      // Expected failure
+    }
+
+    // CHECK: allocation was not accepted
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == numJobsInScenario);
+
+  }
+
+  @Test
+  public void testOrderImpossible() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    2 * step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step) }, // Duration
+            ReservationRequestInterpreter.R_ORDER, "u1");
+
+    // Add reservation
+    try {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u1", plan, rr1);
+      fail();
+    } catch (PlanningException e) {
+      // Expected failure
+    }
+
+    // CHECK: allocation was not accepted
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == numJobsInScenario);
+
+  }
+
+  @Test
+  public void testAnyImpossible() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    3 * step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    2 * step) }, // Duration
+            ReservationRequestInterpreter.R_ANY, "u1");
+
+    // Add reservation
+    try {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u1", plan, rr1);
+      fail();
+    } catch (PlanningException e) {
+      // Expected failure
+    }
+
+    // CHECK: allocation was not accepted
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == numJobsInScenario);
+
+  }
+
+  @Test
+  public void testAnyAccept() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    2 * step) }, // Duration
+            ReservationRequestInterpreter.R_ANY, "u1");
+
+    // Add reservation
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr1);
+
+    // CHECK: allocation was accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == numJobsInScenario + 1);
+
+    // Get reservation
+    ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+    // Verify allocation
+    assertTrue(alloc1.toString(),
+        check(alloc1, 14 * step, 15 * step, 20, 1024, 1));
+
+  }
+
+  @Test
+  public void testAllAccept() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Add reservation
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr1);
+
+    // CHECK: allocation was accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == numJobsInScenario + 1);
+
+    // Get reservation
+    ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+    // Verify allocation
+    assertTrue(alloc1.toString(),
+        check(alloc1, 10 * step, 11 * step, 20, 1024, 1));
+    assertTrue(alloc1.toString(),
+        check(alloc1, 14 * step, 15 * step, 20, 1024, 1));
+
+  }
+
+  @Test
+  public void testAllImpossible() throws PlanningException {
+
+    // Prepare basic plan
+    int numJobsInScenario = initializeScenario2();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] {
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    step), // Duration
+                ReservationRequest.newInstance(
+                    Resource.newInstance(1024, 1), // Capability
+                    20, // Num containers
+                    20, // Concurrency
+                    2 * step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Add reservation
+    try {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u1", plan, rr1);
+      fail();
+    } catch (PlanningException e) {
+      // Expected failure
+    }
+
+    // CHECK: allocation was not accepted
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == numJobsInScenario);
+
+  }
+
+  @Test
+  public void testUpdate() throws PlanningException {
+
+    // Create flexible reservation
+    ReservationDefinition rrFlex =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            14 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(1024, 1), // Capability
+                100, // Num containers
+                1, // Concurrency
+                2 * step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Create blocking reservation
+    ReservationDefinition rrBlock =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            11 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(1024, 1), // Capability
+                100, // Num containers
+                100, // Concurrency
+                step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Create reservation IDs
+    ReservationId flexReservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    ReservationId blockReservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+
+    // Add block, add flex, remove block, update flex
+    agent.createReservation(blockReservationID, "uBlock", plan, rrBlock);
+    agent.createReservation(flexReservationID, "uFlex", plan, rrFlex);
+    agent.deleteReservation(blockReservationID, "uBlock", plan);
+    agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex);
+
+    // CHECK: allocation was accepted
+    assertTrue("Agent-based allocation failed", flexReservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == 1);
+
+    // Get reservation
+    ReservationAllocation alloc1 = plan.getReservationById(flexReservationID);
+
+    // Verify allocation
+    assertTrue(alloc1.toString(),
+        check(alloc1, 10 * step, 14 * step, 50, 1024, 1));
+
+  }
+
+  @Test
+  public void testImpossibleDuration() throws PlanningException {
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            15 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(1024, 1), // Capability
+                20, // Num containers
+                20, // Concurrency
+                10 * step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Add reservation
+    try {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u1", plan, rr1);
+      fail();
+    } catch (PlanningException e) {
+      // Expected failure
+    }
+
+    // CHECK: allocation was not accepted
+    assertTrue("Agent-based allocation should have failed", plan
+        .getAllReservations().size() == 0);
+
+  }
+
+  @Test
+  public void testLoadedDurationIntervals() throws PlanningException {
+
+    int numJobsInScenario = initializeScenario3();
+
+    // Create reservation
+    ReservationDefinition rr1 =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            13 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(1024, 1), // Capability
+                80, // Num containers
+                10, // Concurrency
+                step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Add reservation
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    agent.createReservation(reservationID, "u1", plan, rr1);
+
+    // CHECK: allocation was accepted
+    assertTrue("Agent-based allocation failed", reservationID != null);
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == numJobsInScenario + 1);
+
+    // Get reservation
+    ReservationAllocation alloc1 = plan.getReservationById(reservationID);
+
+    // Verify allocation
+    assertTrue(alloc1.toString(),
+        check(alloc1, 10 * step, 11 * step, 20, 1024, 1));
+    assertTrue(alloc1.toString(),
+        check(alloc1, 11 * step, 12 * step, 20, 1024, 1));
+    assertTrue(alloc1.toString(),
+        check(alloc1, 12 * step, 13 * step, 40, 1024, 1));
+  }
+
+  @Test
+  public void testCostFunction() throws PlanningException {
+
+    // Create large memory reservation
+    ReservationDefinition rr7Mem1Core =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            11 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(7 * 1024, 1),// Capability
+                1, // Num containers
+                1, // Concurrency
+                step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u1");
+
+    // Create reservation
+    ReservationDefinition rr6Mem6Cores =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            11 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(6 * 1024, 6),// Capability
+                1, // Num containers
+                1, // Concurrency
+                step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u2");
+
+    // Create reservation
+    ReservationDefinition rr =
+        createReservationDefinition(
+            10 * step, // Job arrival time
+            12 * step, // Job deadline
+            new ReservationRequest[] { ReservationRequest.newInstance(
+                Resource.newInstance(1024, 1), // Capability
+                1, // Num containers
+                1, // Concurrency
+                step) }, // Duration
+            ReservationRequestInterpreter.R_ALL, "u3");
+
+    // Create reservation IDs
+    ReservationId reservationID1 =
+        ReservationSystemTestUtil.getNewReservationId();
+    ReservationId reservationID2 =
+        ReservationSystemTestUtil.getNewReservationId();
+    ReservationId reservationID3 =
+        ReservationSystemTestUtil.getNewReservationId();
+
+    // Add all
+    agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core);
+    agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores);
+    agent.createReservation(reservationID3, "u3", plan, rr);
+
+    // Get reservation
+    ReservationAllocation alloc3 = plan.getReservationById(reservationID3);
+
+    assertTrue(alloc3.toString(),
+        check(alloc3, 10 * step, 11 * step, 0, 1024, 1));
+    assertTrue(alloc3.toString(),
+        check(alloc3, 11 * step, 12 * step, 1, 1024, 1));
+
+  }
+
+  @Test
+  public void testFromCluster() throws PlanningException {
+
+    // int numJobsInScenario = initializeScenario3();
+
+    List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
+
+    // Create reservation
+    list.add(createReservationDefinition(
+        1425716392178L, // Job arrival time
+        1425722262791L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            7, // Num containers
+            1, // Concurrency
+            587000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u1"));
+
+    list.add(createReservationDefinition(
+        1425716406178L, // Job arrival time
+        1425721255841L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            6, // Num containers
+            1, // Concurrency
+            485000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u2"));
+
+    list.add(createReservationDefinition(
+        1425716399178L, // Job arrival time
+        1425723780138L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            6, // Num containers
+            1, // Concurrency
+            738000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u3"));
+
+    list.add(createReservationDefinition(
+        1425716437178L, // Job arrival time
+        1425722968378L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            7, // Num containers
+            1, // Concurrency
+            653000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u4"));
+
+    list.add(createReservationDefinition(
+        1425716406178L, // Job arrival time
+        1425721926090L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            6, // Num containers
+            1, // Concurrency
+            552000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u5"));
+
+    list.add(createReservationDefinition(
+        1425716379178L, // Job arrival time
+        1425722238553L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            6, // Num containers
+            1, // Concurrency
+            586000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u6"));
+
+    list.add(createReservationDefinition(
+        1425716407178L, // Job arrival time
+        1425722908317L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            7, // Num containers
+            1, // Concurrency
+            650000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u7"));
+
+    list.add(createReservationDefinition(
+        1425716452178L, // Job arrival time
+        1425722841562L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            6, // Num containers
+            1, // Concurrency
+            639000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u8"));
+
+    list.add(createReservationDefinition(
+        1425716384178L, // Job arrival time
+        1425721766129L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            7, // Num containers
+            1, // Concurrency
+            538000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u9"));
+
+    list.add(createReservationDefinition(
+        1425716437178L, // Job arrival time
+        1425722507886L, // Job deadline
+        new ReservationRequest[] { ReservationRequest.newInstance(
+            Resource.newInstance(1024, 1), // Capability
+            5, // Num containers
+            1, // Concurrency
+            607000) }, // Duration
+        ReservationRequestInterpreter.R_ALL, "u10"));
+
+    // Add reservation
+    int i = 1;
+    for (ReservationDefinition rr : list) {
+      ReservationId reservationID =
+          ReservationSystemTestUtil.getNewReservationId();
+      agent.createReservation(reservationID, "u" + Integer.toString(i), plan,
+          rr);
+      ++i;
+    }
+
+    // CHECK: allocation was accepted
+    assertTrue("Agent-based allocation failed", plan.getAllReservations()
+        .size() == list.size());
+
+  }
+
+  @Before
+  public void setup() throws Exception {
+
+    // Initialize random seed
+    long seed = rand.nextLong();
+    rand.setSeed(seed);
+    Log.info("Running with seed: " + seed);
+
+    // Set cluster parameters
+    long timeWindow = 1000000L;
+    int capacityMem = 100 * 1024;
+    int capacityCores = 100;
+    step = 60000L;
+
+    Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
+
+    // Set configuration
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    String reservationQ = testUtil.getFullReservationQueueName();
+    float instConstraint = 100;
+    float avgConstraint = 100;
+
+    ReservationSchedulerConfiguration conf =
+        ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
+            instConstraint, avgConstraint);
+
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, conf);
+
+    QueueMetrics queueMetrics = mock(QueueMetrics.class);
+
+    // Set planning agent
+    agent = new AlignedPlannerWithGreedy();
+
+    // Create Plan
+    plan =
+        new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+            res, minAlloc, maxAlloc, "dedicated", null, true);
+  }
+
+  private int initializeScenario1() throws PlanningException {
+
+    // insert in the reservation a couple of controlled reservations, to create
+    // conditions for assignment that are non-empty
+
+    addFixedAllocation(0L, step, new int[] { 10, 10, 20, 20, 20, 10, 10 });
+
+    System.out.println("--------BEFORE AGENT----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    return 1;
+
+  }
+
+  private int initializeScenario2() throws PlanningException {
+
+    // insert in the reservation a couple of controlled reservations, to create
+    // conditions for assignment that are non-empty
+
+    addFixedAllocation(11 * step, step, new int[] { 90, 90, 90 });
+
+    System.out.println("--------BEFORE AGENT----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    return 1;
+
+  }
+
+  private int initializeScenario3() throws PlanningException {
+
+    // insert in the reservation a couple of controlled reservations, to create
+    // conditions for assignment that are non-empty
+
+    addFixedAllocation(10 * step, step, new int[] { 70, 80, 60 });
+
+    System.out.println("--------BEFORE AGENT----------");
+    System.out.println(plan.toString());
+    System.out.println(plan.toCumulativeString());
+
+    return 1;
+
+  }
+
+  private void addFixedAllocation(long start, long step, int[] f)
+      throws PlanningException {
+
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(
+            ReservationSystemTestUtil.getNewReservationId(), null,
+            "user_fixed", "dedicated", start, start + f.length * step,
+            ReservationSystemTestUtil.generateAllocation(start, step, f), res,
+            minAlloc)));
+
+  }
+
+  private ReservationDefinition createReservationDefinition(long arrival,
+      long deadline, ReservationRequest[] reservationRequests,
+      ReservationRequestInterpreter rType, String username) {
+
+    return ReservationDefinition.newInstance(arrival, deadline,
+        ReservationRequests.newInstance(Arrays.asList(reservationRequests),
+            rType), username);
+
+  }
+
+  private boolean check(ReservationAllocation alloc, long start, long end,
+      int containers, int mem, int cores) {
+
+    Resource expectedResources =
+        Resource.newInstance(mem * containers, cores * containers);
+
+    // Verify that all allocations in [start,end) equal containers * (mem,cores)
+    for (long i = start; i < end; i++) {
+      if (!Resources.equals(alloc.getResourcesAtTime(i), expectedResources)) {
+        return false;
+      }
+    }
+    return true;
+
+  }
+
+}


[4/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino) (cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)

Posted by cu...@apache.org.
YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)
(cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/26ea0458
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/26ea0458
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/26ea0458

Branch: refs/heads/branch-2
Commit: 26ea0458143cf09c01b5763622852bcbf9fd15ca
Parents: 621203b
Author: ccurino <cc...@ubuntu.gateway.2wire.net>
Authored: Sat Jul 25 07:39:47 2015 -0700
Committer: ccurino <cc...@ubuntu.gateway.2wire.net>
Committed: Sat Jul 25 07:47:11 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../reservation/AbstractReservationSystem.java  |   2 +
 .../reservation/GreedyReservationAgent.java     | 390 ---------
 .../reservation/InMemoryPlan.java               |  13 +-
 .../InMemoryReservationAllocation.java          |   8 +-
 .../resourcemanager/reservation/Plan.java       |   1 +
 .../reservation/PlanContext.java                |   2 +
 .../resourcemanager/reservation/PlanView.java   |  31 +-
 .../resourcemanager/reservation/Planner.java    |  47 --
 .../RLESparseResourceAllocation.java            |  55 +-
 .../reservation/ReservationAgent.java           |  72 --
 .../ReservationSchedulerConfiguration.java      |   6 +-
 .../reservation/ReservationSystem.java          |   5 +-
 .../reservation/ReservationSystemUtil.java      |   6 +-
 .../reservation/SimpleCapacityReplanner.java    | 113 ---
 .../planning/AlignedPlannerWithGreedy.java      | 123 +++
 .../planning/GreedyReservationAgent.java        |  97 +++
 .../reservation/planning/IterativePlanner.java  | 338 ++++++++
 .../reservation/planning/Planner.java           |  49 ++
 .../reservation/planning/PlanningAlgorithm.java | 207 +++++
 .../reservation/planning/ReservationAgent.java  |  73 ++
 .../planning/SimpleCapacityReplanner.java       | 118 +++
 .../reservation/planning/StageAllocator.java    |  55 ++
 .../planning/StageAllocatorGreedy.java          | 152 ++++
 .../planning/StageAllocatorLowCostAligned.java  | 360 ++++++++
 .../planning/StageEarliestStart.java            |  46 ++
 .../planning/StageEarliestStartByDemand.java    | 106 +++
 .../StageEarliestStartByJobArrival.java         |  39 +
 .../planning/TryManyReservationAgents.java      | 114 +++
 .../reservation/ReservationSystemTestUtil.java  |   5 +-
 .../reservation/TestCapacityOverTimePolicy.java |   2 +-
 .../TestCapacitySchedulerPlanFollower.java      |   1 +
 .../reservation/TestFairReservationSystem.java  |   1 -
 .../TestFairSchedulerPlanFollower.java          |   1 +
 .../reservation/TestGreedyReservationAgent.java | 604 --------------
 .../reservation/TestInMemoryPlan.java           |   2 +
 .../reservation/TestNoOverCommitPolicy.java     |   1 +
 .../TestRLESparseResourceAllocation.java        |  51 +-
 .../TestSchedulerPlanFollowerBase.java          |   1 +
 .../TestSimpleCapacityReplanner.java            | 162 ----
 .../planning/TestAlignedPlanner.java            | 820 +++++++++++++++++++
 .../planning/TestGreedyReservationAgent.java    | 611 ++++++++++++++
 .../planning/TestSimpleCapacityReplanner.java   | 170 ++++
 43 files changed, 3634 insertions(+), 1429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b803b89..c3f2015 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -92,6 +92,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2019. Retrospect on decision of making RM crashed if any exception throw 
     in ZKRMStateStore. (Jian He via junping_du)
 
+    YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. 
+    (Jonathan Yaniv and Ishai Menache via curino)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index 8a15ac6..d2603c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
deleted file mode 100644
index 214df1c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This Agent employs a simple greedy placement strategy, placing the various
- * stages of a {@link ReservationRequest} from the deadline moving backward
- * towards the arrival. This allows jobs with earlier deadline to be scheduled
- * greedily as well. Combined with an opportunistic anticipation of work if the
- * cluster is not fully utilized also seems to provide good latency for
- * best-effort jobs (i.e., jobs running without a reservation).
- * 
- * This agent does not account for locality and only consider container
- * granularity for validation purposes (i.e., you can't exceed max-container
- * size).
- */
-public class GreedyReservationAgent implements ReservationAgent {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(GreedyReservationAgent.class);
-
-  @Override
-  public boolean createReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException {
-    return computeAllocation(reservationId, user, plan, contract, null);
-  }
-
-  @Override
-  public boolean updateReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException {
-    return computeAllocation(reservationId, user, plan, contract,
-        plan.getReservationById(reservationId));
-  }
-
-  @Override
-  public boolean deleteReservation(ReservationId reservationId, String user,
-      Plan plan) throws PlanningException {
-    return plan.deleteReservation(reservationId);
-  }
-
-  private boolean computeAllocation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract,
-      ReservationAllocation oldReservation) throws PlanningException,
-      ContractValidationException {
-    LOG.info("placing the following ReservationRequest: " + contract);
-
-    Resource totalCapacity = plan.getTotalCapacity();
-
-    // Here we can addd logic to adjust the ResourceDefinition to account for
-    // system "imperfections" (e.g., scheduling delays for large containers).
-
-    // Align with plan step conservatively (i.e., ceil arrival, and floor
-    // deadline)
-    long earliestStart = contract.getArrival();
-    long step = plan.getStep();
-    if (earliestStart % step != 0) {
-      earliestStart = earliestStart + (step - (earliestStart % step));
-    }
-    long deadline =
-        contract.getDeadline() - contract.getDeadline() % plan.getStep();
-
-    // setup temporary variables to handle time-relations between stages and
-    // intermediate answers
-    long curDeadline = deadline;
-    long oldDeadline = -1;
-
-    Map<ReservationInterval, Resource> allocations =
-        new HashMap<ReservationInterval, Resource>();
-    RLESparseResourceAllocation tempAssigned =
-        new RLESparseResourceAllocation(plan.getResourceCalculator(),
-            plan.getMinimumAllocation());
-
-    List<ReservationRequest> stages = contract.getReservationRequests()
-        .getReservationResources();
-    ReservationRequestInterpreter type = contract.getReservationRequests()
-        .getInterpreter();
-
-    boolean hasGang = false;
-
-    // Iterate the stages in backward from deadline
-    for (ListIterator<ReservationRequest> li = 
-        stages.listIterator(stages.size()); li.hasPrevious();) {
-
-      ReservationRequest currentReservationStage = li.previous();
-
-      // validate the RR respect basic constraints
-      validateInput(plan, currentReservationStage, totalCapacity);
-
-      hasGang |= currentReservationStage.getConcurrency() > 1;
-
-      // run allocation for a single stage
-      Map<ReservationInterval, Resource> curAlloc =
-          placeSingleStage(plan, tempAssigned, currentReservationStage,
-              earliestStart, curDeadline, oldReservation, totalCapacity);
-
-      if (curAlloc == null) {
-        // if we did not find an allocation for the currentReservationStage
-        // return null, unless the ReservationDefinition we are placing is of
-        // type ANY
-        if (type != ReservationRequestInterpreter.R_ANY) {
-          throw new PlanningException("The GreedyAgent"
-              + " couldn't find a valid allocation for your request");
-        } else {
-          continue;
-        }
-      } else {
-
-        // if we did find an allocation add it to the set of allocations
-        allocations.putAll(curAlloc);
-
-        // if this request is of type ANY we are done searching (greedy)
-        // and can return the current allocation (break-out of the search)
-        if (type == ReservationRequestInterpreter.R_ANY) {
-          break;
-        }
-
-        // if the request is of ORDER or ORDER_NO_GAP we constraint the next
-        // round of allocation to precede the current allocation, by setting
-        // curDeadline
-        if (type == ReservationRequestInterpreter.R_ORDER
-            || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
-          curDeadline = findEarliestTime(curAlloc.keySet());
-
-          // for ORDER_NO_GAP verify that the allocation found so far has no
-          // gap, return null otherwise (the greedy procedure failed to find a
-          // no-gap
-          // allocation)
-          if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
-              && oldDeadline > 0) {
-            if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
-                .getStep()) {
-              throw new PlanningException("The GreedyAgent"
-                  + " couldn't find a valid allocation for your request");
-            }
-          }
-          // keep the variable oldDeadline pointing to the last deadline we
-          // found
-          oldDeadline = curDeadline;
-        }
-      }
-    }
-
-    // / If we got here is because we failed to find an allocation for the
-    // ReservationDefinition give-up and report failure to the user
-    if (allocations.isEmpty()) {
-      throw new PlanningException("The GreedyAgent"
-          + " couldn't find a valid allocation for your request");
-    }
-
-    // create reservation with above allocations if not null/empty
-
-    Resource ZERO_RES = Resource.newInstance(0, 0);
-
-    long firstStartTime = findEarliestTime(allocations.keySet());
-    
-    // add zero-padding from arrival up to the first non-null allocation
-    // to guarantee that the reservation exists starting at arrival
-    if (firstStartTime > earliestStart) {
-      allocations.put(new ReservationInterval(earliestStart,
-          firstStartTime), ZERO_RES);
-      firstStartTime = earliestStart;
-      // consider to add trailing zeros at the end for simmetry
-    }
-
-    // Actually add/update the reservation in the plan.
-    // This is subject to validation as other agents might be placing
-    // in parallel and there might be sharing policies the agent is not
-    // aware off.
-    ReservationAllocation capReservation =
-        new InMemoryReservationAllocation(reservationId, contract, user,
-            plan.getQueueName(), firstStartTime,
-            findLatestTime(allocations.keySet()), allocations,
-            plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
-    if (oldReservation != null) {
-      return plan.updateReservation(capReservation);
-    } else {
-      return plan.addReservation(capReservation);
-    }
-  }
-
-  private void validateInput(Plan plan, ReservationRequest rr,
-      Resource totalCapacity) throws ContractValidationException {
-
-    if (rr.getConcurrency() < 1) {
-      throw new ContractValidationException("Gang Size should be >= 1");
-    }
-
-    if (rr.getNumContainers() <= 0) {
-      throw new ContractValidationException("Num containers should be >= 0");
-    }
-
-    // check that gangSize and numContainers are compatible
-    if (rr.getNumContainers() % rr.getConcurrency() != 0) {
-      throw new ContractValidationException(
-          "Parallelism must be an exact multiple of gang size");
-    }
-
-    // check that the largest container request does not exceed
-    // the cluster-wide limit for container sizes
-    if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
-        rr.getCapability(), plan.getMaximumAllocation())) {
-      throw new ContractValidationException("Individual"
-          + " capability requests should not exceed cluster's maxAlloc");
-    }
-  }
-
-  /**
-   * This method actually perform the placement of an atomic stage of the
-   * reservation. The key idea is to traverse the plan backward for a
-   * "lease-duration" worth of time, and compute what is the maximum multiple of
-   * our concurrency (gang) parameter we can fit. We do this and move towards
-   * previous instant in time until the time-window is exhausted or we placed
-   * all the user request.
-   */
-  private Map<ReservationInterval, Resource> placeSingleStage(
-      Plan plan, RLESparseResourceAllocation tempAssigned,
-      ReservationRequest rr, long earliestStart, long curDeadline,
-      ReservationAllocation oldResAllocation, final Resource totalCapacity) {
-
-    Map<ReservationInterval, Resource> allocationRequests =
-        new HashMap<ReservationInterval, Resource>();
-
-    // compute the gang as a resource and get the duration
-    Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
-    long dur = rr.getDuration();
-    long step = plan.getStep();
-
-    // ceil the duration to the next multiple of the plan step
-    if (dur % step != 0) {
-      dur += (step - (dur % step));
-    }
-
-    // we know for sure that this division has no remainder (part of contract
-    // with user, validate before
-    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
-
-    int maxGang = 0;
-
-    // loop trying to place until we are done, or we are considering
-    // an invalid range of times
-    while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
-
-      // as we run along we remember how many gangs we can fit, and what
-      // was the most constraining moment in time (we will restart just
-      // after that to place the next batch)
-      maxGang = gangsToPlace;
-      long minPoint = curDeadline;
-      int curMaxGang = maxGang;
-
-      // start placing at deadline (excluded due to [,) interval semantics and
-      // move backward
-      for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
-          && maxGang > 0; t = t - plan.getStep()) {
-
-        // As we run along we will logically remove the previous allocation for
-        // this reservation
-        // if one existed
-        Resource oldResCap = Resource.newInstance(0, 0);
-        if (oldResAllocation != null) {
-          oldResCap = oldResAllocation.getResourcesAtTime(t);
-        }
-
-        // compute net available resources
-        Resource netAvailableRes = Resources.clone(totalCapacity);
-        Resources.addTo(netAvailableRes, oldResCap);
-        Resources.subtractFrom(netAvailableRes,
-            plan.getTotalCommittedResources(t));
-        Resources.subtractFrom(netAvailableRes,
-            tempAssigned.getCapacityAtTime(t));
-
-        // compute maximum number of gangs we could fit
-        curMaxGang =
-            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
-                totalCapacity, netAvailableRes, gang));
-
-        // pick the minimum between available resources in this instant, and how
-        // many gangs we have to place
-        curMaxGang = Math.min(gangsToPlace, curMaxGang);
-
-        // compare with previous max, and set it. also remember *where* we found
-        // the minimum (useful for next attempts)
-        if (curMaxGang <= maxGang) {
-          maxGang = curMaxGang;
-          minPoint = t;
-        }
-      }
-
-      // if we were able to place any gang, record this, and decrement
-      // gangsToPlace
-      if (maxGang > 0) {
-        gangsToPlace -= maxGang;
-
-        ReservationInterval reservationInt =
-            new ReservationInterval(curDeadline - dur, curDeadline);
-        ReservationRequest reservationRequest =
-            ReservationRequest.newInstance(rr.getCapability(),
-                rr.getConcurrency() * maxGang, rr.getConcurrency(),
-                rr.getDuration());
-        // remember occupied space (plan is read-only till we find a plausible
-        // allocation for the entire request). This is needed since we might be
-        // placing other ReservationRequest within the same
-        // ReservationDefinition,
-        // and we must avoid double-counting the available resources
-        final Resource reservationRes = ReservationSystemUtil.toResource(
-            reservationRequest);
-        tempAssigned.addInterval(reservationInt, reservationRes);
-        allocationRequests.put(reservationInt, reservationRes);
-
-      }
-
-      // reset our new starting point (curDeadline) to the most constraining
-      // point so far, we will look "left" of that to find more places where
-      // to schedule gangs (for sure nothing on the "right" of this point can
-      // fit a full gang.
-      curDeadline = minPoint;
-    }
-
-    // if no gangs are left to place we succeed and return the allocation
-    if (gangsToPlace == 0) {
-      return allocationRequests;
-    } else {
-      // If we are here is becasue we did not manage to satisfy this request.
-      // So we need to remove unwanted side-effect from tempAssigned (needed
-      // for ANY).
-      for (Map.Entry<ReservationInterval, Resource> tempAllocation :
-        allocationRequests.entrySet()) {
-        tempAssigned.removeInterval(tempAllocation.getKey(),
-            tempAllocation.getValue());
-      }
-      // and return null to signal failure in this allocation
-      return null;
-    }
-  }
-
-  // finds the leftmost point of this set of ReservationInterval
-  private long findEarliestTime(Set<ReservationInterval> resInt) {
-    long ret = Long.MAX_VALUE;
-    for (ReservationInterval s : resInt) {
-      if (s.getStartTime() < ret) {
-        ret = s.getStartTime();
-      }
-    }
-    return ret;
-  }
-
-  // finds the rightmost point of this set of ReservationIntervals
-  private long findLatestTime(Set<ReservationInterval> resInt) {
-    long ret = Long.MIN_VALUE;
-    for (ReservationInterval s : resInt) {
-      if (s.getEndTime() > ret) {
-        ret = s.getEndTime();
-      }
-    }
-    return ret;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 50d66cf..abc9c98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.UTCClock;
@@ -41,7 +43,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class InMemoryPlan implements Plan {
+/**
+ * This class represents an in memory representation of the state of our
+ * reservation system, and provides accelerated access to both individual
+ * reservations and aggregate utilization of resources over time.
+ */
+public class InMemoryPlan implements Plan {
 
   private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
 
@@ -75,7 +82,7 @@ class InMemoryPlan implements Plan {
 
   private Resource totalCapacity;
 
-  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
       String queueName, Planner replanner, boolean getMoveOnExpiry) {
@@ -83,7 +90,7 @@ class InMemoryPlan implements Plan {
         maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
   }
 
-  InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
+  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
       ReservationAgent agent, Resource totalCapacity, long step,
       ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
       String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index a4dd23b..42a2243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 /**
  * An in memory implementation of a reservation allocation using the
  * {@link RLESparseResourceAllocation}
- * 
+ *
  */
-class InMemoryReservationAllocation implements ReservationAllocation {
+public class InMemoryReservationAllocation implements ReservationAllocation {
 
   private final String planName;
   private final ReservationId reservationID;
@@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
 
   private RLESparseResourceAllocation resourcesOverTime;
 
-  InMemoryReservationAllocation(ReservationId reservationID,
+  public InMemoryReservationAllocation(ReservationId reservationID,
       ReservationDefinition contract, String user, String planName,
       long startTime, long endTime,
       Map<ReservationInterval, Resource> allocations,
@@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
         allocations, calculator, minAlloc, false);
   }
 
-  InMemoryReservationAllocation(ReservationId reservationID,
+  public InMemoryReservationAllocation(ReservationId reservationID,
       ReservationDefinition contract, String user, String planName,
       long startTime, long endTime,
       Map<ReservationInterval, Resource> allocations,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
index e8e9e29..f7ffbd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 
 /**
  * A Plan represents the central data structure of a reservation system that

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
index 6d3506d..94e299e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index b49e99e..be68906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -1,26 +1,27 @@
-/*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *       http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *******************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 
 /**
  * This interface provides a read-only view on the allocations made in this

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
deleted file mode 100644
index 57f28ff..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-public interface Planner {
-
-  /**
-   * Update the existing {@link Plan}, by adding/removing/updating existing
-   * reservations, and adding a subset of the reservation requests in the
-   * contracts parameter.
-   *
-   * @param plan the {@link Plan} to replan
-   * @param contracts the list of reservation requests
-   * @throws PlanningException
-   */
-  public void plan(Plan plan, List<ReservationDefinition> contracts)
-      throws PlanningException;
-
-  /**
-   * Initialize the replanner
-   *
-   * @param planQueueName the name of the queue for this plan
-   * @param conf the scheduler configuration
-   */
-  void init(String planQueueName, ReservationSchedulerConfiguration conf);
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 2957cc6..80f2ff7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -38,7 +38,7 @@ import com.google.gson.stream.JsonWriter;
 
 /**
  * This is a run length encoded sparse data structure that maintains resource
- * allocations over time
+ * allocations over time.
  */
 public class RLESparseResourceAllocation {
 
@@ -74,7 +74,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Add a resource for the specified interval
-   * 
+   *
    * @param reservationInterval the interval for which the resource is to be
    *          added
    * @param totCap the resource to be added
@@ -138,7 +138,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Removes a resource for the specified interval
-   * 
+   *
    * @param reservationInterval the interval for which the resource is to be
    *          removed
    * @param totCap the resource to be removed
@@ -189,7 +189,7 @@ public class RLESparseResourceAllocation {
   /**
    * Returns the capacity, i.e. total resources allocated at the specified point
    * of time
-   * 
+   *
    * @param tick the time (UTC in ms) at which the capacity is requested
    * @return the resources allocated at the specified time
    */
@@ -208,7 +208,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Get the timestamp of the earliest resource allocation
-   * 
+   *
    * @return the timestamp of the first resource allocation
    */
   public long getEarliestStartTime() {
@@ -226,7 +226,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Get the timestamp of the latest resource allocation
-   * 
+   *
    * @return the timestamp of the last resource allocation
    */
   public long getLatestEndTime() {
@@ -244,7 +244,7 @@ public class RLESparseResourceAllocation {
 
   /**
    * Returns true if there are no non-zero entries
-   * 
+   *
    * @return true if there are no allocations or false otherwise
    */
   public boolean isEmpty() {
@@ -287,7 +287,7 @@ public class RLESparseResourceAllocation {
   /**
    * Returns the JSON string representation of the current resources allocated
    * over time
-   * 
+   *
    * @return the JSON string representation of the current resources allocated
    *         over time
    */
@@ -312,4 +312,43 @@ public class RLESparseResourceAllocation {
     }
   }
 
+  /**
+   * Returns the representation of the current resources allocated over time as
+   * an interval map.
+   *
+   * @return the representation of the current resources allocated over time as
+   *         an interval map.
+   */
+  public Map<ReservationInterval, Resource> toIntervalMap() {
+
+    readLock.lock();
+    try {
+      Map<ReservationInterval, Resource> allocations =
+          new TreeMap<ReservationInterval, Resource>();
+
+      // Empty
+      if (isEmpty()) {
+        return allocations;
+      }
+
+      Map.Entry<Long, Resource> lastEntry = null;
+      for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
+
+        if (lastEntry != null) {
+          ReservationInterval interval =
+              new ReservationInterval(lastEntry.getKey(), entry.getKey());
+          Resource resource = lastEntry.getValue();
+
+          allocations.put(interval, resource);
+        }
+
+        lastEntry = entry;
+      }
+      return allocations;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
deleted file mode 100644
index 6955036..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *******************************************************************************/
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-
-/**
- * An entity that seeks to acquire resources to satisfy an user's contract
- */
-public interface ReservationAgent {
-
-  /**
-   * Create a reservation for the user that abides by the specified contract
-   * 
-   * @param reservationId the identifier of the reservation to be created.
-   * @param user the user who wants to create the reservation
-   * @param plan the Plan to which the reservation must be fitted
-   * @param contract encapsulates the resources the user requires for his
-   *          session
-   * 
-   * @return whether the create operation was successful or not
-   * @throws PlanningException if the session cannot be fitted into the plan
-   */
-  public boolean createReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException;
-
-  /**
-   * Update a reservation for the user that abides by the specified contract
-   * 
-   * @param reservationId the identifier of the reservation to be updated
-   * @param user the user who wants to create the session
-   * @param plan the Plan to which the reservation must be fitted
-   * @param contract encapsulates the resources the user requires for his
-   *          reservation
-   * 
-   * @return whether the update operation was successful or not
-   * @throws PlanningException if the reservation cannot be fitted into the plan
-   */
-  public boolean updateReservation(ReservationId reservationId, String user,
-      Plan plan, ReservationDefinition contract) throws PlanningException;
-
-  /**
-   * Delete an user reservation
-   * 
-   * @param reservationId the identifier of the reservation to be deleted
-   * @param user the user who wants to create the reservation
-   * @param plan the Plan to which the session must be fitted
-   * 
-   * @return whether the delete operation was successful or not
-   * @throws PlanningException if the reservation cannot be fitted into the plan
-   */
-  public boolean deleteReservation(ReservationId reservationId, String user,
-      Plan plan) throws PlanningException;
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
index 2af1ffd..c430b1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 
 public abstract class ReservationSchedulerConfiguration extends Configuration {
 
@@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
 
   @InterfaceAudience.Private
   public static final String DEFAULT_RESERVATION_AGENT_NAME =
-      "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy";
 
   @InterfaceAudience.Private
   public static final String DEFAULT_RESERVATION_PLANNER_NAME =
-      "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
+      "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner";
 
   @InterfaceAudience.Private
   public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index cb76dcf..3309693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -24,12 +24,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 
 /**
  * This interface is the one implemented by any system that wants to support

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
index 8affae4..5562adc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.util.HashMap;
 import java.util.Map;
 
-final class ReservationSystemUtil {
+/**
+ * Simple helper class for static methods used to transform across
+ * common formats in tests
+ */
+public final class ReservationSystemUtil {
 
   private ReservationSystemUtil() {
     // not called

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
deleted file mode 100644
index b5a6a99..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.UTCClock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This (re)planner scan a period of time from now to a maximum time window (or
- * the end of the last session, whichever comes first) checking the overall
- * capacity is not violated.
- * 
- * It greedily removes sessions in reversed order of acceptance (latest accepted
- * is the first removed).
- */
-public class SimpleCapacityReplanner implements Planner {
-
-  private static final Log LOG = LogFactory
-      .getLog(SimpleCapacityReplanner.class);
-
-  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
-
-  private final Clock clock;
-
-  // this allows to control to time-span of this replanning
-  // far into the future time instants might be worth replanning for
-  // later on
-  private long lengthOfCheckZone;
-
-  public SimpleCapacityReplanner() {
-    this(new UTCClock());
-  }
-
-  @VisibleForTesting
-  SimpleCapacityReplanner(Clock clock) {
-    this.clock = clock;
-  }
-
-  @Override
-  public void init(String planQueueName,
-      ReservationSchedulerConfiguration conf) {
-    this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
-  }
-
-  @Override
-  public void plan(Plan plan, List<ReservationDefinition> contracts)
-      throws PlanningException {
-
-    if (contracts != null) {
-      throw new RuntimeException(
-          "SimpleCapacityReplanner cannot handle new reservation contracts");
-    }
-
-    ResourceCalculator resCalc = plan.getResourceCalculator();
-    Resource totCap = plan.getTotalCapacity();
-    long now = clock.getTime();
-
-    // loop on all moment in time from now to the end of the check Zone
-    // or the end of the planned sessions whichever comes first
-    for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t +=
-        plan.getStep()) {
-      Resource excessCap =
-          Resources.subtract(plan.getTotalCommittedResources(t), totCap);
-      // if we are violating
-      if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
-        // sorted on reverse order of acceptance, so newest reservations first
-        Set<ReservationAllocation> curReservations =
-            new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
-        for (Iterator<ReservationAllocation> resIter =
-            curReservations.iterator(); resIter.hasNext()
-            && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) {
-          ReservationAllocation reservation = resIter.next();
-          plan.deleteReservation(reservation.getReservationId());
-          excessCap =
-              Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
-          LOG.info("Removing reservation " + reservation.getReservationId()
-              + " to repair physical-resource constraints in the plan: "
-              + plan.getQueueName());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
new file mode 100644
index 0000000..a389928
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A planning algorithm that first runs LowCostAligned, and if it fails runs
+ * Greedy.
+ */
+public class AlignedPlannerWithGreedy implements ReservationAgent {
+
+  // Default smoothness factor
+  private static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
+
+  // Log
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AlignedPlannerWithGreedy.class);
+
+  // Smoothness factor
+  private final ReservationAgent planner;
+
+  // Constructor
+  public AlignedPlannerWithGreedy() {
+    this(DEFAULT_SMOOTHNESS_FACTOR);
+  }
+
+  // Constructor
+  public AlignedPlannerWithGreedy(int smoothnessFactor) {
+
+    // List of algorithms
+    List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
+
+    // LowCostAligned planning algorithm
+    ReservationAgent algAligned =
+        new IterativePlanner(new StageEarliestStartByDemand(),
+            new StageAllocatorLowCostAligned(smoothnessFactor));
+    listAlg.add(algAligned);
+
+    // Greedy planning algorithm
+    ReservationAgent algGreedy =
+        new IterativePlanner(new StageEarliestStartByJobArrival(),
+            new StageAllocatorGreedy());
+    listAlg.add(algGreedy);
+
+    // Set planner:
+    // 1. Attempt to execute algAligned
+    // 2. If failed, fall back to algGreedy
+    planner = new TryManyReservationAgents(listAlg);
+
+  }
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("placing the following ReservationRequest: " + contract);
+
+    try {
+      boolean res =
+          planner.createReservation(reservationId, user, plan, contract);
+
+      if (res) {
+        LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      } else {
+        LOG.info("OUTCOME: FAILURE, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      }
+      return res;
+    } catch (PlanningException e) {
+      LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+          + ", Contract: " + contract.toString());
+      throw e;
+    }
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("updating the following ReservationRequest: " + contract);
+
+    return planner.updateReservation(reservationId, user, plan, contract);
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    LOG.info("removing the following ReservationId: " + reservationId);
+
+    return planner.deleteReservation(reservationId, user, plan);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
new file mode 100644
index 0000000..db82a66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Agent employs a simple greedy placement strategy, placing the various
+ * stages of a {@link ReservationDefinition} from the deadline moving backward
+ * towards the arrival. This allows jobs with earlier deadline to be scheduled
+ * greedily as well. Combined with an opportunistic anticipation of work if the
+ * cluster is not fully utilized also seems to provide good latency for
+ * best-effort jobs (i.e., jobs running without a reservation).
+ *
+ * This agent does not account for locality and only consider container
+ * granularity for validation purposes (i.e., you can't exceed max-container
+ * size).
+ */
+
+public class GreedyReservationAgent implements ReservationAgent {
+
+  // Log
+  private static final Logger LOG = LoggerFactory
+      .getLogger(GreedyReservationAgent.class);
+
+  // Greedy planner
+  private final ReservationAgent planner = new IterativePlanner(
+      new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("placing the following ReservationRequest: " + contract);
+
+    try {
+      boolean res =
+          planner.createReservation(reservationId, user, plan, contract);
+
+      if (res) {
+        LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      } else {
+        LOG.info("OUTCOME: FAILURE, Reservation ID: "
+            + reservationId.toString() + ", Contract: " + contract.toString());
+      }
+      return res;
+    } catch (PlanningException e) {
+      LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+          + ", Contract: " + contract.toString());
+      throw e;
+    }
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    LOG.info("updating the following ReservationRequest: " + contract);
+
+    return planner.updateReservation(reservationId, user, plan, contract);
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    LOG.info("removing the following ReservationId: " + reservationId);
+
+    return planner.deleteReservation(reservationId, user, plan);
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
new file mode 100644
index 0000000..342c2e7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A planning algorithm consisting of two main phases. The algorithm iterates
+ * over the job stages in descending order. For each stage, the algorithm: 1.
+ * Determines an interval [stageArrivalTime, stageDeadline) in which the stage
+ * is allocated. 2. Computes an allocation for the stage inside the interval.
+ *
+ * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
+ * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
+ * each stage is set as succcessorStartTime - the starting time of its
+ * succeeding stage (or jobDeadline if it is the last stage).
+ *
+ * The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
+ * setAlgComputeStageAllocation
+ */
+public class IterativePlanner extends PlanningAlgorithm {
+
+  // Modifications performed by the algorithm that are not been reflected in the
+  // actual plan while a request is still pending.
+  private RLESparseResourceAllocation planModifications;
+
+  // Data extracted from plan
+  private Map<Long, Resource> planLoads;
+  private Resource capacity;
+  private long step;
+
+  // Job parameters
+  private ReservationRequestInterpreter jobType;
+  private long jobArrival;
+  private long jobDeadline;
+
+  // Phase algorithms
+  private StageEarliestStart algStageEarliestStart = null;
+  private StageAllocator algStageAllocator = null;
+
+  // Constructor
+  public IterativePlanner(StageEarliestStart algEarliestStartTime,
+      StageAllocator algStageAllocator) {
+
+    setAlgStageEarliestStart(algEarliestStartTime);
+    setAlgStageAllocator(algStageAllocator);
+
+  }
+
+  @Override
+  public RLESparseResourceAllocation computeJobAllocation(Plan plan,
+      ReservationId reservationId, ReservationDefinition reservation)
+      throws ContractValidationException {
+
+    // Initialize
+    initialize(plan, reservation);
+
+    // If the job has been previously reserved, logically remove its allocation
+    ReservationAllocation oldReservation =
+        plan.getReservationById(reservationId);
+    if (oldReservation != null) {
+      ignoreOldAllocation(oldReservation);
+    }
+
+    // Create the allocations data structure
+    RLESparseResourceAllocation allocations =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+    // Get a reverse iterator for the set of stages
+    ListIterator<ReservationRequest> li =
+        reservation
+            .getReservationRequests()
+            .getReservationResources()
+            .listIterator(
+                reservation.getReservationRequests().getReservationResources()
+                    .size());
+
+    // Current stage
+    ReservationRequest currentReservationStage;
+
+    // Index, points on the current node
+    int index =
+        reservation.getReservationRequests().getReservationResources().size();
+
+    // Stage deadlines
+    long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
+    long successorStartingTime = -1;
+
+    // Iterate the stages in reverse order
+    while (li.hasPrevious()) {
+
+      // Get current stage
+      currentReservationStage = li.previous();
+      index -= 1;
+
+      // Validate that the ReservationRequest respects basic constraints
+      validateInputStage(plan, currentReservationStage);
+
+      // Compute an adjusted earliestStart for this resource
+      // (we need this to provision some space for the ORDER contracts)
+      long stageArrivalTime = reservation.getArrival();
+      if (jobType == ReservationRequestInterpreter.R_ORDER
+          || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+        stageArrivalTime =
+            computeEarliestStartingTime(plan, reservation, index,
+                currentReservationStage, stageDeadline);
+      }
+      stageArrivalTime = stepRoundUp(stageArrivalTime, step);
+      stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
+
+      // Compute the allocation of a single stage
+      Map<ReservationInterval, Resource> curAlloc =
+          computeStageAllocation(plan, currentReservationStage,
+              stageArrivalTime, stageDeadline);
+
+      // If we did not find an allocation, return NULL
+      // (unless it's an ANY job, then we simply continue).
+      if (curAlloc == null) {
+
+        // If it's an ANY job, we can move to the next possible request
+        if (jobType == ReservationRequestInterpreter.R_ANY) {
+          continue;
+        }
+
+        // Otherwise, the job cannot be allocated
+        return null;
+
+      }
+
+      // Get the start & end time of the current allocation
+      Long stageStartTime = findEarliestTime(curAlloc.keySet());
+      Long stageEndTime = findLatestTime(curAlloc.keySet());
+
+      // If we did find an allocation for the stage, add it
+      for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
+        allocations.addInterval(entry.getKey(), entry.getValue());
+      }
+
+      // If this is an ANY clause, we have finished
+      if (jobType == ReservationRequestInterpreter.R_ANY) {
+        break;
+      }
+
+      // If ORDER job, set the stageDeadline of the next stage to be processed
+      if (jobType == ReservationRequestInterpreter.R_ORDER
+          || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+
+        // Verify that there is no gap, in case the job is ORDER_NO_GAP
+        if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
+            && successorStartingTime != -1
+            && successorStartingTime > stageEndTime) {
+
+          return null;
+
+        }
+
+        // Store the stageStartTime and set the new stageDeadline
+        successorStartingTime = stageStartTime;
+        stageDeadline = stageStartTime;
+
+      }
+
+    }
+
+    // If the allocation is empty, return an error
+    if (allocations.isEmpty()) {
+      return null;
+    }
+
+    return allocations;
+
+  }
+
+  protected void initialize(Plan plan, ReservationDefinition reservation) {
+
+    // Get plan step & capacity
+    capacity = plan.getTotalCapacity();
+    step = plan.getStep();
+
+    // Get job parameters (type, arrival time & deadline)
+    jobType = reservation.getReservationRequests().getInterpreter();
+    jobArrival = stepRoundUp(reservation.getArrival(), step);
+    jobDeadline = stepRoundDown(reservation.getDeadline(), step);
+
+    // Dirty read of plan load
+    planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
+
+    // Initialize the plan modifications
+    planModifications =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+  }
+
+  private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
+      long endTime) {
+
+    // Create map
+    Map<Long, Resource> loads = new HashMap<Long, Resource>();
+
+    // Calculate the load for every time slot between [start,end)
+    for (long t = startTime; t < endTime; t += step) {
+      Resource load = plan.getTotalCommittedResources(t);
+      loads.put(t, load);
+    }
+
+    // Return map
+    return loads;
+
+  }
+
+  private void ignoreOldAllocation(ReservationAllocation oldReservation) {
+
+    // If there is no old reservation, return
+    if (oldReservation == null) {
+      return;
+    }
+
+    // Subtract each allocation interval from the planModifications
+    for (Entry<ReservationInterval, Resource> entry : oldReservation
+        .getAllocationRequests().entrySet()) {
+
+      // Read the entry
+      ReservationInterval interval = entry.getKey();
+      Resource resource = entry.getValue();
+
+      // Find the actual request
+      Resource negativeResource = Resources.multiply(resource, -1);
+
+      // Insert it into planModifications as a 'negative' request, to
+      // represent available resources
+      planModifications.addInterval(interval, negativeResource);
+
+    }
+
+  }
+
+  private void validateInputStage(Plan plan, ReservationRequest rr)
+      throws ContractValidationException {
+
+    // Validate concurrency
+    if (rr.getConcurrency() < 1) {
+      throw new ContractValidationException("Gang Size should be >= 1");
+    }
+
+    // Validate number of containers
+    if (rr.getNumContainers() <= 0) {
+      throw new ContractValidationException("Num containers should be > 0");
+    }
+
+    // Check that gangSize and numContainers are compatible
+    if (rr.getNumContainers() % rr.getConcurrency() != 0) {
+      throw new ContractValidationException(
+          "Parallelism must be an exact multiple of gang size");
+    }
+
+    // Check that the largest container request does not exceed the cluster-wide
+    // limit for container sizes
+    if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
+        rr.getCapability(), plan.getMaximumAllocation())) {
+
+      throw new ContractValidationException(
+          "Individual capability requests should not exceed cluster's " +
+          "maxAlloc");
+
+    }
+
+  }
+
+  // Call algEarliestStartTime()
+  protected long computeEarliestStartingTime(Plan plan,
+      ReservationDefinition reservation, int index,
+      ReservationRequest currentReservationStage, long stageDeadline) {
+
+    return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
+        currentReservationStage, stageDeadline);
+
+  }
+
+  // Call algStageAllocator
+  protected Map<ReservationInterval, Resource> computeStageAllocation(
+      Plan plan, ReservationRequest rr, long stageArrivalTime,
+      long stageDeadline) {
+
+    return algStageAllocator.computeStageAllocation(plan, planLoads,
+        planModifications, rr, stageArrivalTime, stageDeadline);
+
+  }
+
+  // Set the algorithm: algStageEarliestStart
+  public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
+
+    this.algStageEarliestStart = alg;
+    return this; // To allow concatenation of setAlg() functions
+
+  }
+
+  // Set the algorithm: algStageAllocator
+  public IterativePlanner setAlgStageAllocator(StageAllocator alg) {
+
+    this.algStageAllocator = alg;
+    return this; // To allow concatenation of setAlg() functions
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
new file mode 100644
index 0000000..abac6ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+public interface Planner {
+
+  /**
+   * Update the existing {@link Plan}, by adding/removing/updating existing
+   * reservations, and adding a subset of the reservation requests in the
+   * contracts parameter.
+   *
+   * @param plan the {@link Plan} to replan
+   * @param contracts the list of reservation requests
+   * @throws PlanningException
+   */
+  public void plan(Plan plan, List<ReservationDefinition> contracts)
+      throws PlanningException;
+
+  /**
+   * Initialize the replanner
+   *
+   * @param planQueueName the name of the queue for this plan
+   * @param conf the scheduler configuration
+   */
+  void init(String planQueueName, ReservationSchedulerConfiguration conf);
+}


[3/4] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino) (cherry picked from commit 156f24ead00436faad5d4aeef327a546392cd265)

Posted by cu...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
new file mode 100644
index 0000000..9a0a0f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An abstract class that follows the general behavior of planning algorithms.
+ */
+public abstract class PlanningAlgorithm implements ReservationAgent {
+
+  /**
+   * Performs the actual allocation for a ReservationDefinition within a Plan.
+   *
+   * @param reservationId the identifier of the reservation
+   * @param user the user who owns the reservation
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources required by the user for his
+   *          session
+   * @param oldReservation the existing reservation (null if none)
+   * @return whether the allocateUser function was successful or not
+   *
+   * @throws PlanningException if the session cannot be fitted into the plan
+   * @throws ContractValidationException
+   */
+  protected boolean allocateUser(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract,
+      ReservationAllocation oldReservation) throws PlanningException,
+      ContractValidationException {
+
+    // Adjust the ResourceDefinition to account for system "imperfections"
+    // (e.g., scheduling delays for large containers).
+    ReservationDefinition adjustedContract = adjustContract(plan, contract);
+
+    // Compute the job allocation
+    RLESparseResourceAllocation allocation =
+        computeJobAllocation(plan, reservationId, adjustedContract);
+
+    // If no job allocation was found, fail
+    if (allocation == null) {
+      throw new PlanningException(
+          "The planning algorithm could not find a valid allocation"
+              + " for your request");
+    }
+
+    // Translate the allocation to a map (with zero paddings)
+    long step = plan.getStep();
+    long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
+    long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
+    Map<ReservationInterval, Resource> mapAllocations =
+        allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
+
+    // Create the reservation
+    ReservationAllocation capReservation =
+        new InMemoryReservationAllocation(reservationId, // ID
+            adjustedContract, // Contract
+            user, // User name
+            plan.getQueueName(), // Queue name
+            findEarliestTime(mapAllocations.keySet()), // Earliest start time
+            findLatestTime(mapAllocations.keySet()), // Latest end time
+            mapAllocations, // Allocations
+            plan.getResourceCalculator(), // Resource calculator
+            plan.getMinimumAllocation()); // Minimum allocation
+
+    // Add (or update) the reservation allocation
+    if (oldReservation != null) {
+      return plan.updateReservation(capReservation);
+    } else {
+      return plan.addReservation(capReservation);
+    }
+
+  }
+
+  private Map<ReservationInterval, Resource>
+      allocationsToPaddedMap(RLESparseResourceAllocation allocation,
+          long jobArrival, long jobDeadline) {
+
+    // Allocate
+    Map<ReservationInterval, Resource> mapAllocations =
+        allocation.toIntervalMap();
+
+    // Zero allocation
+    Resource zeroResource = Resource.newInstance(0, 0);
+
+    // Pad at the beginning
+    long earliestStart = findEarliestTime(mapAllocations.keySet());
+    if (jobArrival < earliestStart) {
+      mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
+          zeroResource);
+    }
+
+    // Pad at the beginning
+    long latestEnd = findLatestTime(mapAllocations.keySet());
+    if (latestEnd < jobDeadline) {
+      mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
+          zeroResource);
+    }
+
+    return mapAllocations;
+
+  }
+
+  public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
+      ReservationId reservationId, ReservationDefinition reservation)
+      throws PlanningException, ContractValidationException;
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Allocate
+    return allocateUser(reservationId, user, plan, contract, null);
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Get the old allocation
+    ReservationAllocation oldAlloc = plan.getReservationById(reservationId);
+
+    // Allocate (ignores the old allocation)
+    return allocateUser(reservationId, user, plan, contract, oldAlloc);
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    // Delete the existing reservation
+    return plan.deleteReservation(reservationId);
+
+  }
+
+  protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
+
+    long ret = Long.MAX_VALUE;
+    for (ReservationInterval s : sesInt) {
+      if (s.getStartTime() < ret) {
+        ret = s.getStartTime();
+      }
+    }
+    return ret;
+
+  }
+
+  protected static long findLatestTime(Set<ReservationInterval> sesInt) {
+
+    long ret = Long.MIN_VALUE;
+    for (ReservationInterval s : sesInt) {
+      if (s.getEndTime() > ret) {
+        ret = s.getEndTime();
+      }
+    }
+    return ret;
+
+  }
+
+  protected static long stepRoundDown(long t, long step) {
+    return (t / step) * step;
+  }
+
+  protected static long stepRoundUp(long t, long step) {
+    return ((t + step - 1) / step) * step;
+  }
+
+  private ReservationDefinition adjustContract(Plan plan,
+      ReservationDefinition originalContract) {
+
+    // Place here adjustment. For example using QueueMetrics we can track
+    // large container delays per YARN-YARN-1990
+
+    return originalContract;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
new file mode 100644
index 0000000..bdea2f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *******************************************************************************/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * An entity that seeks to acquire resources to satisfy an user's contract
+ */
+public interface ReservationAgent {
+
+  /**
+   * Create a reservation for the user that abides by the specified contract
+   * 
+   * @param reservationId the identifier of the reservation to be created.
+   * @param user the user who wants to create the reservation
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources the user requires for his
+   *          session
+   * 
+   * @return whether the create operation was successful or not
+   * @throws PlanningException if the session cannot be fitted into the plan
+   */
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException;
+
+  /**
+   * Update a reservation for the user that abides by the specified contract
+   * 
+   * @param reservationId the identifier of the reservation to be updated
+   * @param user the user who wants to create the session
+   * @param plan the Plan to which the reservation must be fitted
+   * @param contract encapsulates the resources the user requires for his
+   *          reservation
+   * 
+   * @return whether the update operation was successful or not
+   * @throws PlanningException if the reservation cannot be fitted into the plan
+   */
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException;
+
+  /**
+   * Delete an user reservation
+   * 
+   * @param reservationId the identifier of the reservation to be deleted
+   * @param user the user who wants to create the reservation
+   * @param plan the Plan to which the session must be fitted
+   * 
+   * @return whether the delete operation was successful or not
+   * @throws PlanningException if the reservation cannot be fitted into the plan
+   */
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
new file mode 100644
index 0000000..7507783
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This (re)planner scan a period of time from now to a maximum time window (or
+ * the end of the last session, whichever comes first) checking the overall
+ * capacity is not violated.
+ * 
+ * It greedily removes sessions in reversed order of acceptance (latest accepted
+ * is the first removed).
+ */
+public class SimpleCapacityReplanner implements Planner {
+
+  private static final Log LOG = LogFactory
+      .getLog(SimpleCapacityReplanner.class);
+
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
+  private final Clock clock;
+
+  // this allows to control to time-span of this replanning
+  // far into the future time instants might be worth replanning for
+  // later on
+  private long lengthOfCheckZone;
+
+  public SimpleCapacityReplanner() {
+    this(new UTCClock());
+  }
+
+  @VisibleForTesting
+  SimpleCapacityReplanner(Clock clock) {
+    this.clock = clock;
+  }
+
+  @Override
+  public void init(String planQueueName,
+      ReservationSchedulerConfiguration conf) {
+    this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
+  }
+
+  @Override
+  public void plan(Plan plan, List<ReservationDefinition> contracts)
+      throws PlanningException {
+
+    if (contracts != null) {
+      throw new RuntimeException(
+          "SimpleCapacityReplanner cannot handle new reservation contracts");
+    }
+
+    ResourceCalculator resCalc = plan.getResourceCalculator();
+    Resource totCap = plan.getTotalCapacity();
+    long now = clock.getTime();
+
+    // loop on all moment in time from now to the end of the check Zone
+    // or the end of the planned sessions whichever comes first
+    for (long t = now; 
+         (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); 
+         t += plan.getStep()) {
+      Resource excessCap =
+          Resources.subtract(plan.getTotalCommittedResources(t), totCap);
+      // if we are violating
+      if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
+        // sorted on reverse order of acceptance, so newest reservations first
+        Set<ReservationAllocation> curReservations =
+            new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
+        for (Iterator<ReservationAllocation> resIter =
+            curReservations.iterator(); resIter.hasNext()
+            && Resources.greaterThan(resCalc, totCap, excessCap, 
+                ZERO_RESOURCE);) {
+          ReservationAllocation reservation = resIter.next();
+          plan.deleteReservation(reservation.getReservationId());
+          excessCap =
+              Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
+          LOG.info("Removing reservation " + reservation.getReservationId()
+              + " to repair physical-resource constraints in the plan: "
+              + plan.getQueueName());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
new file mode 100644
index 0000000..9df6b74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * Interface for allocating a single stage in IterativePlanner.
+ */
+public interface StageAllocator {
+
+  /**
+   * Computes the allocation of a stage inside a defined time interval.
+   *
+   * @param plan the Plan to which the reservation must be fitted
+   * @param planLoads a 'dirty' read of the plan loads at each time
+   * @param planModifications the allocations performed by the planning
+   *          algorithm which are not yet reflected by plan
+   * @param rr the stage
+   * @param stageEarliestStart the arrival time (earliest starting time) set for
+   *          the stage by the two phase planning algorithm
+   * @param stageDeadline the deadline of the stage set by the two phase
+   *          planning algorithm
+   *
+   * @return The computed allocation (or null if the stage could not be
+   *         allocated)
+   */
+  Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
new file mode 100644
index 0000000..773fbdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Computes the stage allocation according to the greedy allocation rule. The
+ * greedy rule repeatedly allocates requested containers at the rightmost
+ * (latest) free interval.
+ */
+
+public class StageAllocatorGreedy implements StageAllocator {
+
+  @Override
+  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline) {
+
+    Resource totalCapacity = plan.getTotalCapacity();
+
+    Map<ReservationInterval, Resource> allocationRequests =
+        new HashMap<ReservationInterval, Resource>();
+
+    // compute the gang as a resource and get the duration
+    Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
+    long dur = rr.getDuration();
+    long step = plan.getStep();
+
+    // ceil the duration to the next multiple of the plan step
+    if (dur % step != 0) {
+      dur += (step - (dur % step));
+    }
+
+    // we know for sure that this division has no remainder (part of contract
+    // with user, validate before
+    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
+
+    int maxGang = 0;
+
+    // loop trying to place until we are done, or we are considering
+    // an invalid range of times
+    while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
+
+      // as we run along we remember how many gangs we can fit, and what
+      // was the most constraining moment in time (we will restart just
+      // after that to place the next batch)
+      maxGang = gangsToPlace;
+      long minPoint = stageDeadline;
+      int curMaxGang = maxGang;
+
+      // start placing at deadline (excluded due to [,) interval semantics and
+      // move backward
+      for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
+          && maxGang > 0; t = t - plan.getStep()) {
+
+        // compute net available resources
+        Resource netAvailableRes = Resources.clone(totalCapacity);
+        // Resources.addTo(netAvailableRes, oldResCap);
+        Resources.subtractFrom(netAvailableRes,
+            plan.getTotalCommittedResources(t));
+        Resources.subtractFrom(netAvailableRes,
+            planModifications.getCapacityAtTime(t));
+
+        // compute maximum number of gangs we could fit
+        curMaxGang =
+            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
+                totalCapacity, netAvailableRes, gang));
+
+        // pick the minimum between available resources in this instant, and how
+        // many gangs we have to place
+        curMaxGang = Math.min(gangsToPlace, curMaxGang);
+
+        // compare with previous max, and set it. also remember *where* we found
+        // the minimum (useful for next attempts)
+        if (curMaxGang <= maxGang) {
+          maxGang = curMaxGang;
+          minPoint = t;
+        }
+      }
+
+      // if we were able to place any gang, record this, and decrement
+      // gangsToPlace
+      if (maxGang > 0) {
+        gangsToPlace -= maxGang;
+
+        ReservationInterval reservationInt =
+            new ReservationInterval(stageDeadline - dur, stageDeadline);
+        Resource reservationRes =
+            Resources.multiply(rr.getCapability(), rr.getConcurrency()
+                * maxGang);
+        // remember occupied space (plan is read-only till we find a plausible
+        // allocation for the entire request). This is needed since we might be
+        // placing other ReservationRequest within the same
+        // ReservationDefinition,
+        // and we must avoid double-counting the available resources
+        planModifications.addInterval(reservationInt, reservationRes);
+        allocationRequests.put(reservationInt, reservationRes);
+
+      }
+
+      // reset our new starting point (curDeadline) to the most constraining
+      // point so far, we will look "left" of that to find more places where
+      // to schedule gangs (for sure nothing on the "right" of this point can
+      // fit a full gang.
+      stageDeadline = minPoint;
+    }
+
+    // if no gangs are left to place we succeed and return the allocation
+    if (gangsToPlace == 0) {
+      return allocationRequests;
+    } else {
+      // If we are here is becasue we did not manage to satisfy this request.
+      // So we need to remove unwanted side-effect from tempAssigned (needed
+      // for ANY).
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation
+          : allocationRequests.entrySet()) {
+        planModifications.removeInterval(tempAllocation.getKey(),
+            tempAllocation.getValue());
+      }
+      // and return null to signal failure in this allocation
+      return null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
new file mode 100644
index 0000000..4b5763d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * A stage allocator that iteratively allocates containers in the
+ * {@link DurationInterval} with lowest overall cost. The algorithm only
+ * considers intervals of the form: [stageDeadline - (n+1)*duration,
+ * stageDeadline - n*duration) for an integer n. This guarantees that the
+ * allocations are aligned (as opposed to overlapping duration intervals).
+ *
+ * The smoothnessFactor parameter controls the number of containers that are
+ * simultaneously allocated in each iteration of the algorithm.
+ */
+
+public class StageAllocatorLowCostAligned implements StageAllocator {
+
+  // Smoothness factor
+  private int smoothnessFactor = 10;
+
+  // Constructor
+  public StageAllocatorLowCostAligned() {
+  }
+
+  // Constructor
+  public StageAllocatorLowCostAligned(int smoothnessFactor) {
+    this.smoothnessFactor = smoothnessFactor;
+  }
+
+  // computeJobAllocation()
+  @Override
+  public Map<ReservationInterval, Resource> computeStageAllocation(
+      Plan plan, Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, ReservationRequest rr,
+      long stageEarliestStart, long stageDeadline) {
+
+    // Initialize
+    ResourceCalculator resCalc = plan.getResourceCalculator();
+    Resource capacity = plan.getTotalCapacity();
+    long step = plan.getStep();
+
+    // Create allocationRequestsearlies
+    RLESparseResourceAllocation allocationRequests =
+        new RLESparseResourceAllocation(plan.getResourceCalculator(),
+            plan.getMinimumAllocation());
+
+    // Initialize parameters
+    long duration = stepRoundUp(rr.getDuration(), step);
+    int windowSizeInDurations =
+        (int) ((stageDeadline - stageEarliestStart) / duration);
+    int totalGangs = rr.getNumContainers() / rr.getConcurrency();
+    int numContainersPerGang = rr.getConcurrency();
+    Resource gang =
+        Resources.multiply(rr.getCapability(), numContainersPerGang);
+
+    // Set maxGangsPerUnit
+    int maxGangsPerUnit =
+        (int) Math.max(
+            Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
+    maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
+
+    // If window size is too small, return null
+    if (windowSizeInDurations <= 0) {
+      return null;
+    }
+
+    // Initialize tree sorted by costs
+    TreeSet<DurationInterval> durationIntervalsSortedByCost =
+        new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
+          @Override
+          public int compare(DurationInterval val1, DurationInterval val2) {
+
+            int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
+            if (cmp != 0) {
+              return cmp;
+            }
+
+            return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
+          }
+        });
+
+    // Add durationIntervals that end at (endTime - n*duration) for some n.
+    for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+        + duration; intervalEnd -= duration) {
+
+      long intervalStart = intervalEnd - duration;
+
+      // Get duration interval [intervalStart,intervalEnd)
+      DurationInterval durationInterval =
+          getDurationInterval(intervalStart, intervalEnd, planLoads,
+              planModifications, capacity, resCalc, step);
+
+      // If the interval can fit a gang, add it to the tree
+      if (durationInterval.canAllocate(gang, capacity, resCalc)) {
+        durationIntervalsSortedByCost.add(durationInterval);
+      }
+    }
+
+    // Allocate
+    int remainingGangs = totalGangs;
+    while (remainingGangs > 0) {
+
+      // If no durationInterval can fit a gang, break and return null
+      if (durationIntervalsSortedByCost.isEmpty()) {
+        break;
+      }
+
+      // Get best duration interval
+      DurationInterval bestDurationInterval =
+          durationIntervalsSortedByCost.first();
+      int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
+
+      // Add it
+      remainingGangs -= numGangsToAllocate;
+
+      ReservationInterval reservationInt =
+          new ReservationInterval(bestDurationInterval.getStartTime(),
+              bestDurationInterval.getEndTime());
+
+      Resource reservationRes =
+          Resources.multiply(rr.getCapability(), rr.getConcurrency()
+              * numGangsToAllocate);
+
+      planModifications.addInterval(reservationInt, reservationRes);
+      allocationRequests.addInterval(reservationInt, reservationRes);
+
+      // Remove from tree
+      durationIntervalsSortedByCost.remove(bestDurationInterval);
+
+      // Get updated interval
+      DurationInterval updatedDurationInterval =
+          getDurationInterval(bestDurationInterval.getStartTime(),
+              bestDurationInterval.getStartTime() + duration, planLoads,
+              planModifications, capacity, resCalc, step);
+
+      // Add to tree, if possible
+      if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
+        durationIntervalsSortedByCost.add(updatedDurationInterval);
+      }
+
+    }
+
+    // Get the final allocation
+    Map<ReservationInterval, Resource> allocations =
+        allocationRequests.toIntervalMap();
+
+    // If no gangs are left to place we succeed and return the allocation
+    if (remainingGangs <= 0) {
+      return allocations;
+    } else {
+
+      // If we are here is because we did not manage to satisfy this request.
+      // We remove unwanted side-effect from planModifications (needed for ANY).
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation
+          : allocations.entrySet()) {
+
+        planModifications.removeInterval(tempAllocation.getKey(),
+            tempAllocation.getValue());
+
+      }
+      // Return null to signal failure in this allocation
+      return null;
+
+    }
+
+  }
+
+  protected DurationInterval getDurationInterval(long startTime, long endTime,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc, long step) {
+
+    // Initialize the dominant loads structure
+    Resource dominantResources = Resource.newInstance(0, 0);
+
+    // Calculate totalCost and maxLoad
+    double totalCost = 0.0;
+    for (long t = startTime; t < endTime; t += step) {
+
+      // Get the load
+      Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+      // Increase the total cost
+      totalCost += calcCostOfLoad(load, capacity, resCalc);
+
+      // Update the dominant resources
+      dominantResources = Resources.componentwiseMax(dominantResources, load);
+
+    }
+
+    // Return the corresponding durationInterval
+    return new DurationInterval(startTime, endTime, totalCost,
+        dominantResources);
+
+  }
+
+  protected double calcCostOfInterval(long startTime, long endTime,
+      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc, long step) {
+
+    // Sum costs in the interval [startTime,endTime)
+    double totalCost = 0.0;
+    for (long t = startTime; t < endTime; t += step) {
+      totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
+          resCalc);
+    }
+
+    // Return sum
+    return totalCost;
+
+  }
+
+  protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc) {
+
+    // Get the current load at time t
+    Resource load = getLoadAtTime(t, planLoads, planModifications);
+
+    // Return cost
+    return calcCostOfLoad(load, capacity, resCalc);
+
+  }
+
+  protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planModifications) {
+
+    Resource planLoad = planLoads.get(t);
+    planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
+
+    return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
+
+  }
+
+  protected double calcCostOfLoad(Resource load, Resource capacity,
+      ResourceCalculator resCalc) {
+
+    return resCalc.ratio(load, capacity);
+
+  }
+
+  protected static long stepRoundDown(long t, long step) {
+    return (t / step) * step;
+  }
+
+  protected static long stepRoundUp(long t, long step) {
+    return ((t + step - 1) / step) * step;
+  }
+
+  /**
+   * An inner class that represents an interval, typically of length duration.
+   * The class holds the total cost of the interval and the maximal load inside
+   * the interval in each dimension (both calculated externally).
+   */
+  protected static class DurationInterval {
+
+    private long startTime;
+    private long endTime;
+    private double cost;
+    private Resource maxLoad;
+
+    // Constructor
+    public DurationInterval(long startTime, long endTime, double cost,
+        Resource maxLoad) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.cost = cost;
+      this.maxLoad = maxLoad;
+    }
+
+    // canAllocate() - boolean function, returns whether requestedResources
+    // can be allocated during the durationInterval without
+    // violating capacity constraints
+    public boolean canAllocate(Resource requestedResources, Resource capacity,
+        ResourceCalculator resCalc) {
+
+      Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
+      return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
+
+    }
+
+    // numCanFit() - returns the maximal number of requestedResources can be
+    // allocated during the durationInterval without violating
+    // capacity constraints
+    public int numCanFit(Resource requestedResources, Resource capacity,
+        ResourceCalculator resCalc) {
+
+      // Represents the largest resource demand that can be satisfied throughout
+      // the entire DurationInterval (i.e., during [startTime,endTime))
+      Resource availableResources = Resources.subtract(capacity, maxLoad);
+
+      // Maximal number of requestedResources that fit inside the interval
+      return (int) Math.floor(Resources.divide(resCalc, capacity,
+          availableResources, requestedResources));
+
+    }
+
+    public long getStartTime() {
+      return this.startTime;
+    }
+
+    public void setStartTime(long value) {
+      this.startTime = value;
+    }
+
+    public long getEndTime() {
+      return this.endTime;
+    }
+
+    public void setEndTime(long value) {
+      this.endTime = value;
+    }
+
+    public Resource getMaxLoad() {
+      return this.maxLoad;
+    }
+
+    public void setMaxLoad(Resource value) {
+      this.maxLoad = value;
+    }
+
+    public double getTotalCost() {
+      return this.cost;
+    }
+
+    public void setTotalCost(double value) {
+      this.cost = value;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
new file mode 100644
index 0000000..547616a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Interface for setting the earliest start time of a stage in IterativePlanner.
+ */
+public interface StageEarliestStart {
+
+  /**
+   * Computes the earliest allowed starting time for a given stage.
+   *
+   * @param plan the Plan to which the reservation must be fitted
+   * @param reservation the job contract
+   * @param index the index of the stage in the job contract
+   * @param currentReservationStage the stage
+   * @param stageDeadline the deadline of the stage set by the two phase
+   *          planning algorithm
+   *
+   * @return the earliest allowed starting time for the stage.
+   */
+  long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
+          int index, ReservationRequest currentReservationStage,
+          long stageDeadline);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
new file mode 100644
index 0000000..5a46a4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.ListIterator;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage proportional to the job weight. The
+ * interval [jobArrival, stageDeadline) is divided as follows. First, each stage
+ * is guaranteed at least its requested duration. Then, the stage receives a
+ * fraction of the remaining time. The fraction is calculated as the ratio
+ * between the weight (total requested resources) of the stage and the total
+ * weight of all proceeding stages.
+ */
+
+public class StageEarliestStartByDemand implements StageEarliestStart {
+
+  private long step;
+
+  @Override
+  public long setEarliestStartTime(Plan plan,
+      ReservationDefinition reservation, int index, ReservationRequest current,
+      long stageDeadline) {
+
+    step = plan.getStep();
+
+    // If this is the first stage, don't bother with the computation.
+    if (index < 1) {
+      return reservation.getArrival();
+    }
+
+    // Get iterator
+    ListIterator<ReservationRequest> li =
+        reservation.getReservationRequests().getReservationResources()
+            .listIterator(index);
+    ReservationRequest rr;
+
+    // Calculate the total weight & total duration
+    double totalWeight = calcWeight(current);
+    long totalDuration = getRoundedDuration(current, plan);
+
+    while (li.hasPrevious()) {
+      rr = li.previous();
+      totalWeight += calcWeight(rr);
+      totalDuration += getRoundedDuration(rr, plan);
+    }
+
+    // Compute the weight of the current stage as compared to remaining ones
+    double ratio = calcWeight(current) / totalWeight;
+
+    // Estimate an early start time, such that:
+    // 1. Every stage is guaranteed to receive at least its duration
+    // 2. The remainder of the window is divided between stages
+    // proportionally to its workload (total memory consumption)
+    long window = stageDeadline - reservation.getArrival();
+    long windowRemainder = window - totalDuration;
+    long earlyStart =
+        (long) (stageDeadline - getRoundedDuration(current, plan)
+            - (windowRemainder * ratio));
+
+    // Realign if necessary (since we did some arithmetic)
+    earlyStart = stepRoundUp(earlyStart, step);
+
+    // Return
+    return earlyStart;
+
+  }
+
+  // Weight = total memory consumption of stage
+  protected double calcWeight(ReservationRequest stage) {
+    return (stage.getDuration() * stage.getCapability().getMemory())
+        * (stage.getNumContainers());
+  }
+
+  protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
+    return stepRoundUp(stage.getDuration(), step);
+  }
+
+  protected static long stepRoundDown(long t, long step) {
+    return (t / step) * step;
+  }
+
+  protected static long stepRoundUp(long t, long step) {
+    return ((t + step - 1) / step) * step;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
new file mode 100644
index 0000000..8347816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+
+/**
+ * Sets the earliest start time of a stage as the job arrival time.
+ */
+public class StageEarliestStartByJobArrival implements StageEarliestStart {
+
+  @Override
+  public long setEarliestStartTime(Plan plan,
+      ReservationDefinition reservation, int index, ReservationRequest current,
+      long stageDeadline) {
+
+    return reservation.getArrival();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
new file mode 100644
index 0000000..1d37ce5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+
+/**
+ * A planning algorithm that invokes several other planning algorithms according
+ * to a given order. If one of the planners succeeds, the allocation it
+ * generates is returned.
+ */
+public class TryManyReservationAgents implements ReservationAgent {
+
+  // Planning algorithms
+  private final List<ReservationAgent> algs;
+
+  // Constructor
+  public TryManyReservationAgents(List<ReservationAgent> algs) {
+    this.algs = new LinkedList<ReservationAgent>(algs);
+  }
+
+  @Override
+  public boolean createReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Save the planning exception
+    PlanningException planningException = null;
+
+    // Try all of the algorithms, in order
+    for (ReservationAgent alg : algs) {
+
+      try {
+        if (alg.createReservation(reservationId, user, plan, contract)) {
+          return true;
+        }
+      } catch (PlanningException e) {
+        planningException = e;
+      }
+
+    }
+
+    // If all of the algorithms failed and one of the algorithms threw an
+    // exception, throw the last planning exception
+    if (planningException != null) {
+      throw planningException;
+    }
+
+    // If all of the algorithms failed, return false
+    return false;
+
+  }
+
+  @Override
+  public boolean updateReservation(ReservationId reservationId, String user,
+      Plan plan, ReservationDefinition contract) throws PlanningException {
+
+    // Save the planning exception
+    PlanningException planningException = null;
+
+    // Try all of the algorithms, in order
+    for (ReservationAgent alg : algs) {
+
+      try {
+        if (alg.updateReservation(reservationId, user, plan, contract)) {
+          return true;
+        }
+      } catch (PlanningException e) {
+        planningException = e;
+      }
+
+    }
+
+    // If all of the algorithms failed and one of the algorithms threw an
+    // exception, throw the last planning exception
+    if (planningException != null) {
+      throw planningException;
+    }
+
+    // If all of the algorithms failed, return false
+    return false;
+
+  }
+
+  @Override
+  public boolean deleteReservation(ReservationId reservationId, String user,
+      Plan plan) throws PlanningException {
+
+    return plan.deleteReservation(reservationId);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index be1d69a..adb9dcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -89,7 +90,7 @@ public class ReservationSystemTestUtil {
     Assert.assertEquals(planQName, plan.getQueueName());
     Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
     Assert.assertTrue(
-        plan.getReservationAgent() instanceof GreedyReservationAgent);
+        plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
     Assert.assertTrue(
         plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
   }
@@ -102,7 +103,7 @@ public class ReservationSystemTestUtil {
     Assert.assertEquals(newQ, newPlan.getQueueName());
     Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
     Assert
-        .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+        .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
     Assert
         .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 19f876d..f608c3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
-
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index b8663f6..15f9a89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
index f294eaf..4b685b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26ea0458/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
index e9a4f50..43316f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;